You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/01/11 01:30:20 UTC

[gobblin] branch master updated: [GOBBLIN-1744] Improve handling of null value edge cases when querying Helix (#3603)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aaf48529 [GOBBLIN-1744] Improve handling of null value edge cases when querying Helix (#3603)
6aaf48529 is described below

commit 6aaf4852984f36420cbcbb850ddda7d96393c83f
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Tue Jan 10 17:30:12 2023 -0800

    [GOBBLIN-1744] Improve handling of null value edge cases when querying Helix (#3603)
    
    * [GOBBLIN-1744] Improve logging in null cases when querying from Helix
    
    * * throws a descriptive exception when seeing invalid state from helix / zk
---
 .../GobblinHelixUnexpectedStateException.java      | 29 +++++++
 .../cluster/HelixAssignedParticipantCheck.java     | 24 ++++--
 .../org/apache/gobblin/cluster/HelixUtils.java     | 38 ++++++---
 .../org/apache/gobblin/cluster/HelixUtilsTest.java | 93 +++++++++++++++++++---
 4 files changed, 153 insertions(+), 31 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixUnexpectedStateException.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixUnexpectedStateException.java
new file mode 100644
index 000000000..ec5a74844
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixUnexpectedStateException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+/**
+ * Exception to describe situations where Gobblin sees unexpected state from Helix. Historically, we've seen unexpected
+ * null values, which bubble up as NPE. This exception is explicitly used to differentiate bad Gobblin code from
+ * Helix failures (i.e. seeing a NPE implies Gobblin bug)
+ */
+public class GobblinHelixUnexpectedStateException extends Exception {
+  public GobblinHelixUnexpectedStateException(String message, Object... args) {
+    super(String.format(message, args));
+  }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
index 4017143b7..12439c752 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
@@ -139,16 +139,24 @@ public class HelixAssignedParticipantCheck implements CommitStep {
 
       if (jobContext != null) {
         String participant = jobContext.getAssignedParticipant(partitionNum);
-        if (participant != null) {
-          boolean isAssignedParticipant = participant.equalsIgnoreCase(helixInstanceName);
-          if (!isAssignedParticipant) {
-            log.info("The current helix instance is not the assigned participant. helixInstanceName={}, assignedParticipant={}",
-                helixInstanceName, participant);
-          }
-
-          return isAssignedParticipant;
+        if (participant == null) {
+          log.error("The current assigned participant is null. This implies that \n"
+              + "\t\t(a)Helix failed to write to zookeeper, which is often caused by lack of compression leading / exceeding zookeeper jute max buffer size (Default 1MB)\n"
+              + "\t\t(b)Helix reassigned the task (unlikely if this current task has been running without issue. Helix does not have code for reassigning \"running\" tasks)\n"
+              + "\t\tNote: This logic is true as of Helix version 1.0.2 and ZK version 3.6");
+
+          return false;
+        }
+
+        boolean isAssignedParticipant = participant.equalsIgnoreCase(helixInstanceName);
+        if (!isAssignedParticipant) {
+          log.info("The current helix instance is not the assigned participant. helixInstanceName={}, assignedParticipant={}",
+              helixInstanceName, participant);
         }
+
+        return isAssignedParticipant;
       }
+
       return false;
     };
 
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index ce217ed20..308229b34 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -28,10 +28,13 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.util.Id;
 import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.helix.HelixAdmin;
@@ -54,13 +57,7 @@ import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.listeners.JobListener;
-
-import static org.apache.helix.task.TaskState.STOPPED;
+import static org.apache.helix.task.TaskState.*;
 
 
 /**
@@ -393,13 +390,28 @@ public class HelixUtils {
    *
    * @param jobNames a list of Gobblin job names.
    * @return a map from jobNames to their Helix Workflow Ids.
+   * @throws GobblinHelixUnexpectedStateException when there is inconsistent helix state. This implies that we should retry the call
+   * to avoid acting on stale data
    */
-  public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixManager, Collection<String> jobNames) {
-    Map<String, String> jobNameToWorkflowId = new HashMap<>();
+  public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixManager, Collection<String> jobNames)
+    throws GobblinHelixUnexpectedStateException {
     TaskDriver taskDriver = new TaskDriver(helixManager);
+    return getWorkflowIdsFromJobNames(taskDriver, jobNames);
+  }
+
+  public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriver, Collection<String> jobNames)
+      throws GobblinHelixUnexpectedStateException {
+    Map<String, String> jobNameToWorkflowId = new HashMap<>();
     Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
-    for (String workflow : workflowConfigMap.keySet()) {
-      WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflow);
+    for (Map.Entry<String, WorkflowConfig> entry : workflowConfigMap.entrySet()) {
+      String workflow = entry.getKey();
+      WorkflowConfig workflowConfig = entry.getValue();
+      if (workflowConfig == null) {
+        // As of Helix 1.0.2 implementation, this in theory shouldn't happen. But this null check is here in case implementation changes
+        // because the API doesn't technically prohibit null configs, maps allowing null values is implementation based, and we want to fail loudly with a clear root cause.
+        // the caller of this API should retry this API call
+        throw new GobblinHelixUnexpectedStateException("Received null workflow config from Helix. We should not see any null configs when reading all workflows. workflowId=%s", workflow);
+      }
       //Filter out any stale Helix workflows which are not running.
       if (workflowConfig.getTargetState() != TargetState.START) {
         continue;
@@ -450,4 +462,4 @@ public class HelixUtils {
       log.error("Could not drop instance: {} due to: {}", helixInstanceName, e);
     }
   }
-}
\ No newline at end of file
+}
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
index d7841a88b..e3ea1155f 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
@@ -19,22 +19,36 @@ package org.apache.gobblin.cluster;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Properties;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TargetState;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.WorkflowConfig;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import org.apache.gobblin.util.ConfigUtils;
+import static org.testng.Assert.*;
 
 
 /**
@@ -66,18 +80,77 @@ public class HelixUtilsTest {
     Assert.assertNotNull(url, "Could not find resource " + url);
 
     Config config = ConfigFactory.parseURL(url).resolve();
-    Assert.assertEquals(config.getString("k1"), "v1");
-    Assert.assertEquals(config.getString("k2"), "v1");
-    Assert.assertEquals(config.getInt("k3"), 1000);
+    assertEquals(config.getString("k1"), "v1");
+    assertEquals(config.getString("k2"), "v1");
+    assertEquals(config.getInt("k3"), 1000);
     Assert.assertTrue(config.getBoolean("k4"));
-    Assert.assertEquals(config.getLong("k5"), 10000);
+    assertEquals(config.getLong("k5"), 10000);
 
     Properties properties = ConfigUtils.configToProperties(config);
-    Assert.assertEquals(properties.getProperty("k1"), "v1");
-    Assert.assertEquals(properties.getProperty("k2"), "v1");
-    Assert.assertEquals(properties.getProperty("k3"), "1000");
-    Assert.assertEquals(properties.getProperty("k4"), "true");
-    Assert.assertEquals(properties.getProperty("k5"), "10000");
+    assertEquals(properties.getProperty("k1"), "v1");
+    assertEquals(properties.getProperty("k2"), "v1");
+    assertEquals(properties.getProperty("k3"), "1000");
+    assertEquals(properties.getProperty("k4"), "true");
+    assertEquals(properties.getProperty("k5"), "10000");
+  }
+
+  @Test
+  public void testGetWorkunitIdForJobNames() throws GobblinHelixUnexpectedStateException {
+    final String HELIX_JOB = "job";
+    final String GOBBLIN_JOB_NAME = "gobblin-job-name";
+
+    TaskDriver driver = Mockito.mock(TaskDriver.class);
+    WorkflowConfig workflowCfg = Mockito.mock(WorkflowConfig.class);
+    JobDag dag = Mockito.mock(JobDag.class);
+    JobConfig jobCfg = Mockito.mock(JobConfig.class);
+    TaskConfig taskCfg = Mockito.mock(TaskConfig.class);
+
+    /**
+     * Mocks for setting up the workflow, job dag, job names, etc.
+     *
+     * Example of task cfg
+     * "mapFields" : {
+     *     "006d6d2b-4b8b-4c1b-877b-b7fb51d9295c" : {
+     *       "TASK_SUCCESS_OPTIONAL" : "true",
+     *       "job.id" : "job_KafkaHdfsStreamingTracking_1668738617409",
+     *       "job.name" : "KafkaHdfsStreamingTracking",
+     *       "task.id" : "task_KafkaHdfsStreamingTracking_1668738617409_179",
+     *       "gobblin.cluster.work.unit.file.path" : "<SOME PATH>",
+     *       "TASK_ID" : "006d6d2b-4b8b-4c1b-877b-b7fb51d9295c"
+     *     },
+     */
+    Mockito.when(driver.getWorkflows()).thenReturn(ImmutableMap.of(
+        "workflow-1", workflowCfg
+    ));
+
+    Mockito.when(workflowCfg.getTargetState()).thenReturn(TargetState.START);
+    Mockito.when(workflowCfg.getJobDag()).thenReturn(dag);
+    Mockito.when(dag.getAllNodes()).thenReturn(new HashSet<>(Arrays.asList(HELIX_JOB)));
+    Mockito.when(driver.getJobConfig(HELIX_JOB)).thenReturn(jobCfg);
+    Mockito.when(jobCfg.getTaskConfigMap()).thenReturn(ImmutableMap.of("stub-guid", taskCfg));
+    Mockito.when(taskCfg.getConfigMap()).thenReturn(ImmutableMap.of(ConfigurationKeys.JOB_NAME_KEY, GOBBLIN_JOB_NAME));
+
+    assertEquals(
+        HelixUtils.getWorkflowIdsFromJobNames(driver, Arrays.asList(GOBBLIN_JOB_NAME)),
+        ImmutableMap.of(GOBBLIN_JOB_NAME, "workflow-1"));
+  }
+
+  @Test(expectedExceptions = GobblinHelixUnexpectedStateException.class)
+  public void testGetWorkunitIdForJobNamesWithInvalidHelixState() throws GobblinHelixUnexpectedStateException {
+    final String GOBBLIN_JOB_NAME = "gobblin-job-name";
+
+    TaskDriver driver = Mockito.mock(TaskDriver.class);
+
+    Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>();
+    workflowConfigMap.put("null-workflow-to-throw-exception", null);
+    Mockito.when(driver.getWorkflows()).thenReturn(workflowConfigMap);
+
+    try {
+      HelixUtils.getWorkflowIdsFromJobNames(driver, Arrays.asList(GOBBLIN_JOB_NAME));
+    } catch (GobblinHelixUnexpectedStateException e) {
+      e.printStackTrace();
+      throw e;
+    }
   }
 
   @AfterClass