You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/13 18:33:18 UTC
incubator-gobblin git commit: [GOBBLIN-533] upgrade helix to 0.8.1
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 3bc3d3691 -> 6232b416a
[GOBBLIN-533] upgrade helix to 0.8.1
Closes #2396 from arjun4084346/helixUpgrade
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6232b416
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6232b416
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6232b416
Branch: refs/heads/master
Commit: 6232b416a59613db32f6c08cde703c71555d37b8
Parents: 3bc3d36
Author: Arjun <ab...@linkedin.com>
Authored: Fri Jul 13 11:33:11 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Jul 13 11:33:11 2018 -0700
----------------------------------------------------------------------
.../gobblin/aws/CloudInitScriptBuilder.java | 7 +-
.../gobblin/aws/CloudInitScriptBuilderTest.java | 31 +++++++-
...blinHelixDistributeJobExecutionLauncher.java | 2 +-
.../cluster/GobblinHelixJobLauncher.java | 2 +-
.../gobblin/cluster/GobblinHelixTaskDriver.java | 80 --------------------
.../org/apache/gobblin/cluster/HelixUtils.java | 25 +-----
gradle/scripts/dependencyDefinitions.gradle | 2 +-
7 files changed, 35 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java
index fa65f5b..8da15aa 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java
@@ -53,6 +53,7 @@ public class CloudInitScriptBuilder {
private static final String NFS_SERVER_START_CMD = "/etc/init.d/nfs start";
private static final String NFS_EXPORT_FS_CMD = "exportfs -a";
private static final String NFS_TYPE_4 = "nfs4";
+ public static final String BASH = "#!/bin/bash";
/***
* This method generates the script that would be executed by cloud-init module in EC2 instance
@@ -87,7 +88,7 @@ public class CloudInitScriptBuilder {
String masterS3ConfUri, String masterS3ConfFiles,
String masterS3JarsUri, String masterS3JarsFiles, String masterJarsDir,
String masterJvmMemory, Optional<String> masterJvmArgs, Optional<String> gobblinVersion) {
- final StringBuilder cloudInitCmds = new StringBuilder().append("#!/bin/bash").append("\n");
+ final StringBuilder cloudInitCmds = new StringBuilder().append(BASH).append("\n");
final String clusterMasterClassName = GobblinAWSClusterManager.class.getSimpleName();
@@ -196,7 +197,7 @@ public class CloudInitScriptBuilder {
String workerS3ConfUri, String workerS3ConfFiles,
String workerS3JarsUri, String workerS3JarsFiles, String workerJarsDir,
String workerJvmMemory, Optional<String> workerJvmArgs, Optional<String> gobblinVersion) {
- final StringBuilder cloudInitCmds = new StringBuilder().append("#!/bin/bash").append("\n");
+ final StringBuilder cloudInitCmds = new StringBuilder().append(BASH).append("\n");
final String clusterWorkerClassName = GobblinAWSTaskRunner.class.getSimpleName();
@@ -262,7 +263,7 @@ public class CloudInitScriptBuilder {
.append(clusterWorkerClassName).append(".")
.append("$pi").append(".")
.append(CloudInitScriptBuilder.STDERR);
- cloudInitCmds.append(launchGobblinClusterWorkerCmd);
+ cloudInitCmds.append(launchGobblinClusterWorkerCmd).append("\n");
final String cloudInitScript = cloudInitCmds.toString();
LOGGER.info("Cloud-init script for worker node: " + cloudInitScript);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java b/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java
index a601991..cb1c70f 100644
--- a/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java
+++ b/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java
@@ -17,11 +17,16 @@
package org.apache.gobblin.aws;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;
import org.junit.BeforeClass;
import org.testng.Assert;
import org.testng.annotations.Test;
+import org.testng.util.Strings;
import com.google.common.base.Optional;
@@ -64,10 +69,8 @@ public class CloudInitScriptBuilderTest {
@BeforeClass
public void setup() throws Exception {
- this.expectedMasterCloudInitScript = IOUtils.toString(GobblinAWSClusterLauncherTest.class.getClassLoader()
- .getResourceAsStream(MASTER_CLOUD_INIT_SCRIPT), "UTF-8");
- this.expectedWorkerCloudInitScript = IOUtils.toString(GobblinAWSClusterLauncherTest.class.getClassLoader()
- .getResourceAsStream(WORKER_CLOUD_INIT_SCRIPT), "UTF-8");
+ this.expectedMasterCloudInitScript = loadFile(MASTER_CLOUD_INIT_SCRIPT);
+ this.expectedWorkerCloudInitScript = loadFile(WORKER_CLOUD_INIT_SCRIPT);
}
@Test
@@ -93,4 +96,24 @@ public class CloudInitScriptBuilderTest {
Assert.assertEquals(decodedScript, this.expectedWorkerCloudInitScript,
"Worker launcher cloud-init script not built as expected");
}
+
+ /**
+ * loads the given file into a string, ignoring the comments, but considering "#!/bin/bash"
+ * @param file file to read
+ * @return file content as a string
+ * @throws IOException
+ */
+ private String loadFile(String file) throws IOException {
+ StringBuilder sb = new StringBuilder();
+
+ List<String> lines = IOUtils
+ .readLines(new InputStreamReader(GobblinAWSClusterLauncherTest.class.getClassLoader().getResourceAsStream(file), "UTF-8"));
+
+ for (String line : lines) {
+ if (line.equals(CloudInitScriptBuilder.BASH) || (!line.startsWith("#") && !Strings.isNullOrEmpty(line))) {
+ sb.append(line).append("\n");
+ }
+ }
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 5800e8d..87613a6 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -231,7 +231,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
return getResultFromUserContent();
} catch (TimeoutException te) {
- HelixUtils.helixTaskDriverWaitToStop(helixManager, helixTaskDriver, planningName, 10L);
+ helixTaskDriver.waitToStop(planningName, 10L);
this.helixTaskDriver.delete(planningName);
this.helixTaskDriver.resume(planningName);
log.info("stopped the queue, deleted the job");
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 231575e..39c6e5b 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -368,7 +368,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
this.jobContext.getJobId(),
timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
} catch (TimeoutException te) {
- HelixUtils.helixTaskDriverWaitToStop(helixManager, helixTaskDriver, helixQueueName, 10L);
+ helixTaskDriver.waitToStop(helixQueueName, 10L);
try {
cancelJob(this.jobListener);
} catch (JobException e) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
deleted file mode 100644
index ebe2b52..0000000
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.cluster;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.HelixPropertyStore;
-import org.apache.helix.task.TargetState;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
-
-/**
- * #HELIX-0.6.7-WORKAROUND
- * Replacement TaskDriver methods to workaround bugs and changes in behavior for the 0.6.7 upgrade
- */
-public class GobblinHelixTaskDriver {
- private final TaskDriver _taskDriver;
-
- public GobblinHelixTaskDriver(HelixManager manager) {
- this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(), manager
- .getConfigAccessor(), manager.getHelixPropertyStore(), manager.getClusterName());
- }
-
- public GobblinHelixTaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor,
- HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {
- _taskDriver = new TaskDriver(admin, accessor, cfgAccessor, propertyStore, clusterName);
- }
-
- /**
- * Delete the workflow
- *
- * @param workflow The workflow name
- * @param timeout The timeout for deleting the workflow/queue in seconds
- */
- public void deleteWorkflow(String workflow, long timeout) throws InterruptedException {
- WorkflowConfig workflowConfig = _taskDriver.getWorkflowConfig(workflow);
-
- // set the target state if not already set
- if (workflowConfig != null && workflowConfig.getTargetState() != TargetState.DELETE) {
- _taskDriver.delete(workflow);
- }
-
- long endTime = System.currentTimeMillis() + (timeout * 1000);
-
- // check for completion of deletion request
- while (System.currentTimeMillis() <= endTime) {
- WorkflowContext workflowContext = _taskDriver.getWorkflowContext(workflow);
-
- if (workflowContext != null) {
- Thread.sleep(1000);
- } else {
- // Successfully deleted
- return;
- }
- }
-
- // Failed to complete deletion within timeout
- throw new HelixException(String
- .format("Fail to delete the workflow/queue %s within %d seconds.", workflow, timeout));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 0c5dbec..ea4e6f7 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -101,8 +101,7 @@ public class HelixUtils {
// If the queue is present, but in delete state then wait for cleanup before recreating the queue
if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) {
- GobblinHelixTaskDriver gobblinHelixTaskDriver = new GobblinHelixTaskDriver(helixManager);
- gobblinHelixTaskDriver.deleteWorkflow(queueName, jobQueueDeleteTimeoutSeconds);
+ new TaskDriver(helixManager).deleteAndWaitForCompletion(queueName, jobQueueDeleteTimeoutSeconds);
// if we get here then the workflow was successfully deleted
workflowConfig = null;
}
@@ -147,26 +146,4 @@ public class HelixUtils {
throw new TimeoutException("task driver wait time [" + timeoutInSeconds + " sec] is expired.");
}
-
- /**
- * Because fix https://github.com/apache/helix/commit/ae8e8e2ef37f48d782fc12f85ca97728cf2b70c4
- * is not available in currently used version 0.6.9
- */
- public static void helixTaskDriverWaitToStop(
- HelixManager helixManager,
- TaskDriver helixTaskDriver,
- String queueName,
- long timeoutInSeconds) throws InterruptedException {
- helixTaskDriver.stop(queueName);
- long endTime = System.currentTimeMillis() + timeoutInSeconds*1000;
- while (System.currentTimeMillis() <= endTime) {
- WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, queueName);
- if (workflowContext == null || workflowContext.getWorkflowState()
- .equals(org.apache.helix.task.TaskState.IN_PROGRESS)) {
- Thread.sleep(1000);
- } else {
- log.info("Successfully stopped the queue");
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gradle/scripts/dependencyDefinitions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index ad1c1cf..c1f53db 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -61,7 +61,7 @@ ext.externalDependency = [
"hadoopYarnMiniCluster": "org.apache.hadoop:hadoop-minicluster:" + hadoopVersion,
"hadoopAnnotations": "org.apache.hadoop:hadoop-annotations:" + hadoopVersion,
"hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0",
- "helix": "org.apache.helix:helix-core:0.6.9",
+ "helix": "org.apache.helix:helix-core:0.8.1",
"hiveCommon": "org.apache.hive:hive-common:" + hiveVersion,
"hiveService": "org.apache.hive:hive-service:" + hiveVersion,
"hiveJdbc": "org.apache.hive:hive-jdbc:" + hiveVersion,