You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/13 18:33:18 UTC

incubator-gobblin git commit: [GOBBLIN-533] upgrade helix to 0.8.1

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 3bc3d3691 -> 6232b416a


[GOBBLIN-533] upgrade helix to 0.8.1

Closes #2396 from arjun4084346/helixUpgrade


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6232b416
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6232b416
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6232b416

Branch: refs/heads/master
Commit: 6232b416a59613db32f6c08cde703c71555d37b8
Parents: 3bc3d36
Author: Arjun <ab...@linkedin.com>
Authored: Fri Jul 13 11:33:11 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Jul 13 11:33:11 2018 -0700

----------------------------------------------------------------------
 .../gobblin/aws/CloudInitScriptBuilder.java     |  7 +-
 .../gobblin/aws/CloudInitScriptBuilderTest.java | 31 +++++++-
 ...blinHelixDistributeJobExecutionLauncher.java |  2 +-
 .../cluster/GobblinHelixJobLauncher.java        |  2 +-
 .../gobblin/cluster/GobblinHelixTaskDriver.java | 80 --------------------
 .../org/apache/gobblin/cluster/HelixUtils.java  | 25 +-----
 gradle/scripts/dependencyDefinitions.gradle     |  2 +-
 7 files changed, 35 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java
index fa65f5b..8da15aa 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/CloudInitScriptBuilder.java
@@ -53,6 +53,7 @@ public class CloudInitScriptBuilder {
   private static final String NFS_SERVER_START_CMD = "/etc/init.d/nfs start";
   private static final String NFS_EXPORT_FS_CMD = "exportfs -a";
   private static final String NFS_TYPE_4 = "nfs4";
+  public static final String BASH = "#!/bin/bash";
 
   /***
    * This method generates the script that would be executed by cloud-init module in EC2 instance
@@ -87,7 +88,7 @@ public class CloudInitScriptBuilder {
       String masterS3ConfUri, String masterS3ConfFiles,
       String masterS3JarsUri, String masterS3JarsFiles, String masterJarsDir,
       String masterJvmMemory, Optional<String> masterJvmArgs, Optional<String> gobblinVersion) {
-    final StringBuilder cloudInitCmds = new StringBuilder().append("#!/bin/bash").append("\n");
+    final StringBuilder cloudInitCmds = new StringBuilder().append(BASH).append("\n");
 
     final String clusterMasterClassName = GobblinAWSClusterManager.class.getSimpleName();
 
@@ -196,7 +197,7 @@ public class CloudInitScriptBuilder {
       String workerS3ConfUri, String workerS3ConfFiles,
       String workerS3JarsUri, String workerS3JarsFiles, String workerJarsDir,
       String workerJvmMemory, Optional<String> workerJvmArgs, Optional<String> gobblinVersion) {
-    final StringBuilder cloudInitCmds = new StringBuilder().append("#!/bin/bash").append("\n");
+    final StringBuilder cloudInitCmds = new StringBuilder().append(BASH).append("\n");
 
     final String clusterWorkerClassName = GobblinAWSTaskRunner.class.getSimpleName();
 
@@ -262,7 +263,7 @@ public class CloudInitScriptBuilder {
         .append(clusterWorkerClassName).append(".")
         .append("$pi").append(".")
         .append(CloudInitScriptBuilder.STDERR);
-    cloudInitCmds.append(launchGobblinClusterWorkerCmd);
+    cloudInitCmds.append(launchGobblinClusterWorkerCmd).append("\n");
 
     final String cloudInitScript = cloudInitCmds.toString();
     LOGGER.info("Cloud-init script for worker node: " + cloudInitScript);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java b/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java
index a601991..cb1c70f 100644
--- a/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java
+++ b/gobblin-aws/src/test/java/org/apache/gobblin/aws/CloudInitScriptBuilderTest.java
@@ -17,11 +17,16 @@
 
 package org.apache.gobblin.aws;
 
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.io.IOUtils;
 import org.junit.BeforeClass;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import org.testng.util.Strings;
 
 import com.google.common.base.Optional;
 
@@ -64,10 +69,8 @@ public class CloudInitScriptBuilderTest {
 
   @BeforeClass
   public void setup() throws Exception {
-    this.expectedMasterCloudInitScript = IOUtils.toString(GobblinAWSClusterLauncherTest.class.getClassLoader()
-        .getResourceAsStream(MASTER_CLOUD_INIT_SCRIPT), "UTF-8");
-    this.expectedWorkerCloudInitScript = IOUtils.toString(GobblinAWSClusterLauncherTest.class.getClassLoader()
-        .getResourceAsStream(WORKER_CLOUD_INIT_SCRIPT), "UTF-8");
+    this.expectedMasterCloudInitScript = loadFile(MASTER_CLOUD_INIT_SCRIPT);
+    this.expectedWorkerCloudInitScript = loadFile(WORKER_CLOUD_INIT_SCRIPT);
   }
 
   @Test
@@ -93,4 +96,24 @@ public class CloudInitScriptBuilderTest {
     Assert.assertEquals(decodedScript, this.expectedWorkerCloudInitScript,
         "Worker launcher cloud-init script not built as expected");
   }
+
+  /**
+   * loads the given file into a string, ignoring the comments, but considering "#!/bin/bash"
+   * @param file file to read
+   * @return file content as a string
+   * @throws IOException
+   */
+  private String loadFile(String file) throws IOException {
+    StringBuilder sb = new StringBuilder();
+
+    List<String> lines = IOUtils
+        .readLines(new InputStreamReader(GobblinAWSClusterLauncherTest.class.getClassLoader().getResourceAsStream(file), "UTF-8"));
+
+    for (String line : lines) {
+      if (line.equals(CloudInitScriptBuilder.BASH) || (!line.startsWith("#") && !Strings.isNullOrEmpty(line))) {
+        sb.append(line).append("\n");
+      }
+    }
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
index 5800e8d..87613a6 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java
@@ -231,7 +231,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
           timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
       return getResultFromUserContent();
     } catch (TimeoutException te) {
-      HelixUtils.helixTaskDriverWaitToStop(helixManager, helixTaskDriver, planningName, 10L);
+      helixTaskDriver.waitToStop(planningName, 10L);
       this.helixTaskDriver.delete(planningName);
       this.helixTaskDriver.resume(planningName);
       log.info("stopped the queue, deleted the job");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index 231575e..39c6e5b 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -368,7 +368,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
           this.jobContext.getJobId(),
           timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty());
     } catch (TimeoutException te) {
-      HelixUtils.helixTaskDriverWaitToStop(helixManager, helixTaskDriver, helixQueueName, 10L);
+      helixTaskDriver.waitToStop(helixQueueName, 10L);
       try {
         cancelJob(this.jobListener);
       } catch (JobException e) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
deleted file mode 100644
index ebe2b52..0000000
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.cluster;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.HelixPropertyStore;
-import org.apache.helix.task.TargetState;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
-
-/**
- * #HELIX-0.6.7-WORKAROUND
- * Replacement TaskDriver methods to workaround bugs and changes in behavior for the 0.6.7 upgrade
- */
-public class GobblinHelixTaskDriver {
-  private final TaskDriver _taskDriver;
-
-  public GobblinHelixTaskDriver(HelixManager manager) {
-    this(manager.getClusterManagmentTool(), manager.getHelixDataAccessor(), manager
-        .getConfigAccessor(), manager.getHelixPropertyStore(), manager.getClusterName());
-  }
-
-  public GobblinHelixTaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor,
-      HelixPropertyStore<ZNRecord> propertyStore, String clusterName) {
-    _taskDriver = new TaskDriver(admin, accessor, cfgAccessor, propertyStore, clusterName);
-  }
-
-  /**
-   * Delete the workflow
-   *
-   * @param workflow  The workflow name
-   * @param timeout   The timeout for deleting the workflow/queue in seconds
-   */
-  public void deleteWorkflow(String workflow, long timeout) throws InterruptedException {
-    WorkflowConfig workflowConfig = _taskDriver.getWorkflowConfig(workflow);
-
-    // set the target state if not already set
-    if (workflowConfig != null && workflowConfig.getTargetState() != TargetState.DELETE) {
-      _taskDriver.delete(workflow);
-    }
-
-    long endTime = System.currentTimeMillis() + (timeout * 1000);
-
-    // check for completion of deletion request
-    while (System.currentTimeMillis() <= endTime) {
-      WorkflowContext workflowContext = _taskDriver.getWorkflowContext(workflow);
-
-      if (workflowContext != null) {
-        Thread.sleep(1000);
-      } else {
-        // Successfully deleted
-        return;
-      }
-    }
-
-    // Failed to complete deletion within timeout
-    throw new HelixException(String
-        .format("Fail to delete the workflow/queue %s within %d seconds.", workflow, timeout));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 0c5dbec..ea4e6f7 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -101,8 +101,7 @@ public class HelixUtils {
 
     // If the queue is present, but in delete state then wait for cleanup before recreating the queue
     if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) {
-      GobblinHelixTaskDriver gobblinHelixTaskDriver = new GobblinHelixTaskDriver(helixManager);
-      gobblinHelixTaskDriver.deleteWorkflow(queueName, jobQueueDeleteTimeoutSeconds);
+      new TaskDriver(helixManager).deleteAndWaitForCompletion(queueName, jobQueueDeleteTimeoutSeconds);
       // if we get here then the workflow was successfully deleted
       workflowConfig = null;
     }
@@ -147,26 +146,4 @@ public class HelixUtils {
 
     throw new TimeoutException("task driver wait time [" + timeoutInSeconds + " sec] is expired.");
   }
-
-  /**
-   * Because fix https://github.com/apache/helix/commit/ae8e8e2ef37f48d782fc12f85ca97728cf2b70c4
-   * is not available in currently used version 0.6.9
-   */
-  public static void helixTaskDriverWaitToStop(
-      HelixManager helixManager,
-      TaskDriver helixTaskDriver,
-      String queueName,
-      long timeoutInSeconds) throws InterruptedException {
-    helixTaskDriver.stop(queueName);
-    long endTime = System.currentTimeMillis() + timeoutInSeconds*1000;
-    while (System.currentTimeMillis() <= endTime) {
-      WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, queueName);
-      if (workflowContext == null || workflowContext.getWorkflowState()
-          .equals(org.apache.helix.task.TaskState.IN_PROGRESS)) {
-        Thread.sleep(1000);
-      } else {
-        log.info("Successfully stopped the queue");
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6232b416/gradle/scripts/dependencyDefinitions.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index ad1c1cf..c1f53db 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -61,7 +61,7 @@ ext.externalDependency = [
     "hadoopYarnMiniCluster": "org.apache.hadoop:hadoop-minicluster:" + hadoopVersion,
     "hadoopAnnotations": "org.apache.hadoop:hadoop-annotations:" + hadoopVersion,
     "hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0",
-    "helix": "org.apache.helix:helix-core:0.6.9",
+    "helix": "org.apache.helix:helix-core:0.8.1",
     "hiveCommon": "org.apache.hive:hive-common:" + hiveVersion,
     "hiveService": "org.apache.hive:hive-service:" + hiveVersion,
     "hiveJdbc": "org.apache.hive:hive-jdbc:" + hiveVersion,