You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2023/01/11 01:30:20 UTC
[gobblin] branch master updated: [GOBBLIN-1744] Improve handling of null value edge cases when querying Helix (#3603)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6aaf48529 [GOBBLIN-1744] Improve handling of null value edge cases when querying Helix (#3603)
6aaf48529 is described below
commit 6aaf4852984f36420cbcbb850ddda7d96393c83f
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Tue Jan 10 17:30:12 2023 -0800
[GOBBLIN-1744] Improve handling of null value edge cases when querying Helix (#3603)
* [GOBBLIN-1744] Improve logging in null cases when querying from Helix
* * throws a descriptive exception when seeing invalid state from helix / zk
---
.../GobblinHelixUnexpectedStateException.java | 29 +++++++
.../cluster/HelixAssignedParticipantCheck.java | 24 ++++--
.../org/apache/gobblin/cluster/HelixUtils.java | 38 ++++++---
.../org/apache/gobblin/cluster/HelixUtilsTest.java | 93 +++++++++++++++++++---
4 files changed, 153 insertions(+), 31 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixUnexpectedStateException.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixUnexpectedStateException.java
new file mode 100644
index 000000000..ec5a74844
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixUnexpectedStateException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+/**
+ * Exception to describe situations where Gobblin sees unexpected state from Helix. Historically, we've seen unexpected
+ * null values, which bubble up as NPE. This exception is explicitly used to differentiate bad Gobblin code from
+ * Helix failures (i.e. seeing a NPE implies Gobblin bug)
+ */
+public class GobblinHelixUnexpectedStateException extends Exception {
+ public GobblinHelixUnexpectedStateException(String message, Object... args) {
+ super(String.format(message, args));
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
index 4017143b7..12439c752 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
@@ -139,16 +139,24 @@ public class HelixAssignedParticipantCheck implements CommitStep {
if (jobContext != null) {
String participant = jobContext.getAssignedParticipant(partitionNum);
- if (participant != null) {
- boolean isAssignedParticipant = participant.equalsIgnoreCase(helixInstanceName);
- if (!isAssignedParticipant) {
- log.info("The current helix instance is not the assigned participant. helixInstanceName={}, assignedParticipant={}",
- helixInstanceName, participant);
- }
-
- return isAssignedParticipant;
+ if (participant == null) {
+ log.error("The current assigned participant is null. This implies that \n"
+ + "\t\t(a)Helix failed to write to zookeeper, which is often caused by lack of compression leading / exceeding zookeeper jute max buffer size (Default 1MB)\n"
+ + "\t\t(b)Helix reassigned the task (unlikely if this current task has been running without issue. Helix does not have code for reassigning \"running\" tasks)\n"
+ + "\t\tNote: This logic is true as of Helix version 1.0.2 and ZK version 3.6");
+
+ return false;
+ }
+
+ boolean isAssignedParticipant = participant.equalsIgnoreCase(helixInstanceName);
+ if (!isAssignedParticipant) {
+ log.info("The current helix instance is not the assigned participant. helixInstanceName={}, assignedParticipant={}",
+ helixInstanceName, participant);
}
+
+ return isAssignedParticipant;
}
+
return false;
};
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index ce217ed20..308229b34 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -28,10 +28,13 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.helix.HelixAdmin;
@@ -54,13 +57,7 @@ import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.listeners.JobListener;
-
-import static org.apache.helix.task.TaskState.STOPPED;
+import static org.apache.helix.task.TaskState.*;
/**
@@ -393,13 +390,28 @@ public class HelixUtils {
*
* @param jobNames a list of Gobblin job names.
* @return a map from jobNames to their Helix Workflow Ids.
+ * @throws GobblinHelixUnexpectedStateException when there is inconsistent helix state. This implies that we should retry the call
+ * to avoid acting on stale data
*/
- public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixManager, Collection<String> jobNames) {
- Map<String, String> jobNameToWorkflowId = new HashMap<>();
+ public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager helixManager, Collection<String> jobNames)
+ throws GobblinHelixUnexpectedStateException {
TaskDriver taskDriver = new TaskDriver(helixManager);
+ return getWorkflowIdsFromJobNames(taskDriver, jobNames);
+ }
+
+ public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver taskDriver, Collection<String> jobNames)
+ throws GobblinHelixUnexpectedStateException {
+ Map<String, String> jobNameToWorkflowId = new HashMap<>();
Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
- for (String workflow : workflowConfigMap.keySet()) {
- WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflow);
+ for (Map.Entry<String, WorkflowConfig> entry : workflowConfigMap.entrySet()) {
+ String workflow = entry.getKey();
+ WorkflowConfig workflowConfig = entry.getValue();
+ if (workflowConfig == null) {
+ // As of Helix 1.0.2 implementation, this in theory shouldn't happen. But this null check is here in case implementation changes
+ // because the API doesn't technically prohibit null configs, maps allowing null values is implementation based, and we want to fail loudly with a clear root cause.
+ // the caller of this API should retry this API call
+ throw new GobblinHelixUnexpectedStateException("Received null workflow config from Helix. We should not see any null configs when reading all workflows. workflowId=%s", workflow);
+ }
//Filter out any stale Helix workflows which are not running.
if (workflowConfig.getTargetState() != TargetState.START) {
continue;
@@ -450,4 +462,4 @@ public class HelixUtils {
log.error("Could not drop instance: {} due to: {}", helixInstanceName, e);
}
}
-}
\ No newline at end of file
+}
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
index d7841a88b..e3ea1155f 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
@@ -19,22 +19,36 @@ package org.apache.gobblin.cluster;
import java.io.IOException;
import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TargetState;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.WorkflowConfig;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import org.apache.gobblin.util.ConfigUtils;
+import static org.testng.Assert.*;
/**
@@ -66,18 +80,77 @@ public class HelixUtilsTest {
Assert.assertNotNull(url, "Could not find resource " + url);
Config config = ConfigFactory.parseURL(url).resolve();
- Assert.assertEquals(config.getString("k1"), "v1");
- Assert.assertEquals(config.getString("k2"), "v1");
- Assert.assertEquals(config.getInt("k3"), 1000);
+ assertEquals(config.getString("k1"), "v1");
+ assertEquals(config.getString("k2"), "v1");
+ assertEquals(config.getInt("k3"), 1000);
Assert.assertTrue(config.getBoolean("k4"));
- Assert.assertEquals(config.getLong("k5"), 10000);
+ assertEquals(config.getLong("k5"), 10000);
Properties properties = ConfigUtils.configToProperties(config);
- Assert.assertEquals(properties.getProperty("k1"), "v1");
- Assert.assertEquals(properties.getProperty("k2"), "v1");
- Assert.assertEquals(properties.getProperty("k3"), "1000");
- Assert.assertEquals(properties.getProperty("k4"), "true");
- Assert.assertEquals(properties.getProperty("k5"), "10000");
+ assertEquals(properties.getProperty("k1"), "v1");
+ assertEquals(properties.getProperty("k2"), "v1");
+ assertEquals(properties.getProperty("k3"), "1000");
+ assertEquals(properties.getProperty("k4"), "true");
+ assertEquals(properties.getProperty("k5"), "10000");
+ }
+
+ @Test
+ public void testGetWorkunitIdForJobNames() throws GobblinHelixUnexpectedStateException {
+ final String HELIX_JOB = "job";
+ final String GOBBLIN_JOB_NAME = "gobblin-job-name";
+
+ TaskDriver driver = Mockito.mock(TaskDriver.class);
+ WorkflowConfig workflowCfg = Mockito.mock(WorkflowConfig.class);
+ JobDag dag = Mockito.mock(JobDag.class);
+ JobConfig jobCfg = Mockito.mock(JobConfig.class);
+ TaskConfig taskCfg = Mockito.mock(TaskConfig.class);
+
+ /**
+ * Mocks for setting up the workflow, job dag, job names, etc.
+ *
+ * Example of task cfg
+ * "mapFields" : {
+ * "006d6d2b-4b8b-4c1b-877b-b7fb51d9295c" : {
+ * "TASK_SUCCESS_OPTIONAL" : "true",
+ * "job.id" : "job_KafkaHdfsStreamingTracking_1668738617409",
+ * "job.name" : "KafkaHdfsStreamingTracking",
+ * "task.id" : "task_KafkaHdfsStreamingTracking_1668738617409_179",
+ * "gobblin.cluster.work.unit.file.path" : "<SOME PATH>",
+ * "TASK_ID" : "006d6d2b-4b8b-4c1b-877b-b7fb51d9295c"
+ * },
+ */
+ Mockito.when(driver.getWorkflows()).thenReturn(ImmutableMap.of(
+ "workflow-1", workflowCfg
+ ));
+
+ Mockito.when(workflowCfg.getTargetState()).thenReturn(TargetState.START);
+ Mockito.when(workflowCfg.getJobDag()).thenReturn(dag);
+ Mockito.when(dag.getAllNodes()).thenReturn(new HashSet<>(Arrays.asList(HELIX_JOB)));
+ Mockito.when(driver.getJobConfig(HELIX_JOB)).thenReturn(jobCfg);
+ Mockito.when(jobCfg.getTaskConfigMap()).thenReturn(ImmutableMap.of("stub-guid", taskCfg));
+ Mockito.when(taskCfg.getConfigMap()).thenReturn(ImmutableMap.of(ConfigurationKeys.JOB_NAME_KEY, GOBBLIN_JOB_NAME));
+
+ assertEquals(
+ HelixUtils.getWorkflowIdsFromJobNames(driver, Arrays.asList(GOBBLIN_JOB_NAME)),
+ ImmutableMap.of(GOBBLIN_JOB_NAME, "workflow-1"));
+ }
+
+ @Test(expectedExceptions = GobblinHelixUnexpectedStateException.class)
+ public void testGetWorkunitIdForJobNamesWithInvalidHelixState() throws GobblinHelixUnexpectedStateException {
+ final String GOBBLIN_JOB_NAME = "gobblin-job-name";
+
+ TaskDriver driver = Mockito.mock(TaskDriver.class);
+
+ Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>();
+ workflowConfigMap.put("null-workflow-to-throw-exception", null);
+ Mockito.when(driver.getWorkflows()).thenReturn(workflowConfigMap);
+
+ try {
+ HelixUtils.getWorkflowIdsFromJobNames(driver, Arrays.asList(GOBBLIN_JOB_NAME));
+ } catch (GobblinHelixUnexpectedStateException e) {
+ e.printStackTrace();
+ throw e;
+ }
}
@AfterClass