You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/29 20:40:20 UTC

[gobblin] branch master updated: [GOBBLIN-1459] correct column length in MysqlSpecStore, MysqlDagStore, MysqlJobStatu…

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ee049d2  [GOBBLIN-1459] correct column length in MysqlSpecStore, MysqlDagStore, MysqlJobStatu…
ee049d2 is described below

commit ee049d20853dfaa2d0f727bc02351859260e75d7
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Jun 29 13:40:12 2021 -0700

    [GOBBLIN-1459] correct column length in MysqlSpecStore, MysqlDagStore, MysqlJobStatu…
    
    Closes #3298 from arjun4084346/columnLength
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |  9 ++++
 .../java/org/apache/gobblin/MysqlDagStore.java     | 62 ++++++++++++++++++++++
 .../metastore/MysqlDagStateStoreFactory.java       | 48 +++++++++++++++++
 .../metastore/MysqlJobStatusStateStore.java        | 16 ++++++
 .../apache/gobblin/metastore/MysqlStateStore.java  |  6 ++-
 .../apache/gobblin/service/FlowConfigV2Test.java   | 30 +++++++++++
 .../org/apache/gobblin/runtime/api/FlowSpec.java   |  6 +++
 .../gobblin/runtime/spec_store/MysqlSpecStore.java | 20 ++++---
 .../modules/orchestration/MysqlDagStateStore.java  | 11 ++--
 .../service/monitoring/KafkaJobStatusMonitor.java  |  9 ++--
 .../monitoring/MysqlJobStatusRetrieverTest.java    | 53 ++++++++++++++++++
 11 files changed, 251 insertions(+), 19 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 4a4c97b..5edc975 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -132,4 +132,13 @@ public class ServiceConfigKeys {
   // Group Membership authentication service
   public static final String GROUP_OWNERSHIP_SERVICE_CLASS = GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class";
   public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE = "org.apache.gobblin.service.NoopGroupOwnershipService";
+
+  public static final int MAX_FLOW_NAME_LENGTH = 128; // defined in FlowId.pdl
+  public static final int MAX_FLOW_GROUP_LENGTH = 128; // defined in FlowId.pdl
+  public static final int MAX_JOB_NAME_LENGTH = 374;
+  public static final int MAX_JOB_GROUP_LENGTH = 374;
+  public static final String STATE_STORE_TABLE_SUFFIX = "gst";
+  public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
+  public static final String DAG_STORE_KEY_SEPARATION_CHARACTER = "_";
+
 }
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java
new file mode 100644
index 0000000..e373577
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin;
+
+import java.io.IOException;
+
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+@Slf4j
+/**
+ * An implementation of {@link MysqlStateStore} backed by MySQL to store Dag.
+ *
+ * @param <T> state object type
+ **/
+public class MysqlDagStore<T extends State> extends MysqlStateStore<T> {
+  /**
+   * Manages the persistence and retrieval of {@link State} in a MySQL database
+   * @param dataSource the {@link DataSource} object for connecting to MySQL
+   * @param stateStoreTableName the table for storing the state in rows keyed by two levels (store_name, table_name)
+   * @param compressedValues should values be compressed for storage?
+   * @param stateClass class of the {@link State}s stored in this state store
+   * @throws IOException in case of failures
+   */
+  public MysqlDagStore(DataSource dataSource, String stateStoreTableName, boolean compressedValues,
+      Class<T> stateClass)
+      throws IOException {
+    super(dataSource, stateStoreTableName, compressedValues, stateClass);
+  }
+
+  @Override
+  protected String getCreateJobStateTableTemplate() {
+    int maxStoreName = ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length()
+        + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
+    int maxTableName = 13; // length of flowExecutionId which is epoch timestamp
+
+    return "CREATE TABLE IF NOT EXISTS $TABLE$ (store_name varchar(" + maxStoreName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
+        + "table_name varchar(" + maxTableName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
+        + " modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,"
+        + " state longblob, primary key(store_name, table_name))";
+  }
+}
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDagStateStoreFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDagStateStoreFactory.java
new file mode 100644
index 0000000..5327c4b
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDagStateStoreFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.MysqlDagStore;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlDagStateStoreFactory extends MysqlStateStoreFactory {
+  @Override
+  public <T extends State> MysqlDagStore<T> createStateStore(Config config, Class<T> stateClass) {
+    String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
+        ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+    boolean compressedValues = ConfigUtils.getBoolean(config, ConfigurationKeys.STATE_STORE_COMPRESSED_VALUES_KEY,
+        ConfigurationKeys.DEFAULT_STATE_STORE_COMPRESSED_VALUES);
+
+    try {
+      BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config,
+          SharedResourcesBrokerFactory.getImplicitBroker());
+
+      return new MysqlDagStore<>(basicDataSource, stateStoreTableName, compressedValues, stateClass);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create MysqlDagStore with factory", e);
+    }
+  }
+}
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
index aa73b37..210b3cb 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
@@ -36,6 +36,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
 import org.apache.gobblin.metastore.predicates.StateStorePredicate;
 import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
+import org.apache.gobblin.service.ServiceConfigKeys;
 
 
 @Slf4j
@@ -71,6 +72,21 @@ public class MysqlJobStatusStateStore<T extends State> extends MysqlStateStore<T
   }
 
   @Override
+  protected String getCreateJobStateTableTemplate() {
+    int maxStoreName = ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length()
+        + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
+    int maxTableName = 13 // length of flowExecutionId which is epoch timestamp
+        + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length() + ServiceConfigKeys.MAX_JOB_NAME_LENGTH
+        + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length() + ServiceConfigKeys.MAX_JOB_GROUP_LENGTH
+        + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length() + ServiceConfigKeys.STATE_STORE_TABLE_SUFFIX.length();
+
+    return "CREATE TABLE IF NOT EXISTS $TABLE$ (store_name varchar(" + maxStoreName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
+        + "table_name varchar(" + maxTableName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
+        + " modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,"
+        + " state longblob, primary key(store_name, table_name))";
+  }
+
+  @Override
   public List<DatasetStateStoreEntryManager<T>> getMetadataForTables(StateStorePredicate predicate)
       throws IOException {
     List<DatasetStateStoreEntryManager<T>> entryManagers = Lists.newArrayList();
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index e472ad1..b96c877 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -162,7 +162,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
     SELECT_METADATA_SQL = SELECT_METADATA_TEMPLATE.replace("$TABLE$", stateStoreTableName);
 
     // create table if it does not exist
-    String createJobTable = CREATE_JOB_STATE_TABLE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
+    String createJobTable = getCreateJobStateTableTemplate().replace("$TABLE$", stateStoreTableName);
     try (Connection connection = dataSource.getConnection();
         PreparedStatement createStatement = connection.prepareStatement(createJobTable)) {
       createStatement.executeUpdate();
@@ -171,6 +171,10 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
     }
   }
 
+  protected String getCreateJobStateTableTemplate() {
+    return CREATE_JOB_STATE_TABLE_TEMPLATE;
+  }
+
   /**
    * creates a new {@link BasicDataSource}
    * @param config the properties used for datasource instantiation
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index aa067c0..bf96f0a 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -345,6 +345,36 @@ public class FlowConfigV2Test {
     }
   }
 
+  @Test
+  public void testInvalidFlowId() throws Exception {
+    Map<String, String> flowProperties = Maps.newHashMap();
+    flowProperties.put("param1", "value1");
+    StringBuilder sb1 = new StringBuilder();
+    StringBuilder sb2 = new StringBuilder();
+    int maxFlowNameLength = ServiceConfigKeys.MAX_FLOW_NAME_LENGTH;
+    int maxFlowGroupLength = ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
+    while(maxFlowGroupLength-- >= 0) {
+      sb1.append("A");
+    }
+    while(maxFlowNameLength-- >= 0) {
+      sb2.append("A");
+    }
+    String TOO_LONG_FLOW_GROUP = sb1.toString();
+    String TOO_LONG_FLOW_NAME = sb2.toString();
+
+    FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TOO_LONG_FLOW_GROUP).setFlowName(TOO_LONG_FLOW_NAME))
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
+    try {
+      _client.createFlowConfig(flowConfig);
+    } catch (RestLiResponseException e) {
+      Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_422_Unprocessable_Entity);
+      Assert.assertTrue(e.getMessage().contains("is out of range"));
+      return;
+    }
+
+    Assert.fail();
+  }
+
   @AfterClass(alwaysRun = true)
   public void tearDown() throws Exception {
     if (_client != null) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index a3d2fff..c66a531 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -47,6 +47,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.Schedule;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -467,5 +468,10 @@ public class FlowSpec implements Configurable, Spec {
           .setFlowName(flowProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)))
           .setProperties(flowPropsAsStringMap);
     }
+
+    public static int maxFlowSpecUriLength() {
+      return URI_SCHEME.length() + ":".length() // URI separator
+        + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
+    }
   }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index ab644af..a9364d8 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -52,6 +52,7 @@ import org.apache.gobblin.runtime.api.SpecSearchObject;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.apache.gobblin.runtime.api.SpecStore;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
 import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
@@ -74,13 +75,6 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
   public static final String DEFAULT_TAG_VALUE = "";
   private static final String NEW_COLUMN = "spec_json";
 
-  private static final String CREATE_TABLE_STATEMENT =
-      "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL, flow_group VARCHAR(128), flow_name VARCHAR(128), "
-          + "template_uri VARCHAR(128), user_to_proxy VARCHAR(128), source_identifier VARCHAR(128), "
-          + "destination_identifier VARCHAR(128), schedule VARCHAR(128), tag VARCHAR(128) NOT NULL, "
-          + "modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
-          + "isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), "
-          + "spec LONGBLOB, " + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
   private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE spec_uri = ?)";
   protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, flow_group, flow_name, template_uri, "
       + "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, owning_group, spec, " + NEW_COLUMN + ") "
@@ -108,13 +102,23 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
     this.specSerDe = specSerDe;
 
     try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, this.tableName))) {
+        PreparedStatement statement = connection.prepareStatement(getCreateJobStateTableTemplate())) {
       statement.executeUpdate();
     } catch (SQLException e) {
       throw new IOException(e);
     }
   }
 
+  protected String getCreateJobStateTableTemplate() {
+    return "CREATE TABLE IF NOT EXISTS " + this.tableName + " (spec_uri VARCHAR(" + FlowSpec.Utils.maxFlowSpecUriLength()
+        + ") NOT NULL, flow_group VARCHAR(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), flow_name VARCHAR("
+        + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), " + "template_uri VARCHAR(128), user_to_proxy VARCHAR(128), "
+        + "source_identifier VARCHAR(128), " + "destination_identifier VARCHAR(128), schedule VARCHAR(128), "
+        + "tag VARCHAR(128) NOT NULL, modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE "
+        + "CURRENT_TIMESTAMP, isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), spec LONGBLOB, "
+        + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
+  }
+
   @Override
   public boolean existsImpl(URI specUri) throws IOException {
     try (Connection connection = this.dataSource.getConnection();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 95ff619..9f98a4b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -26,13 +26,14 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlDagStateStoreFactory;
 import org.apache.gobblin.metastore.MysqlStateStore;
 import org.apache.gobblin.metastore.MysqlStateStoreEntryManager;
-import org.apache.gobblin.metastore.MysqlStateStoreFactory;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metastore.predicates.StateStorePredicate;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
@@ -97,7 +98,7 @@ public class MysqlDagStateStore implements DagStateStore {
    */
   protected StateStore<State> createStateStore(Config config) {
     try {
-      return (MysqlStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
+      return (MysqlDagStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
     } catch (ReflectiveOperationException rfoe) {
       throw new RuntimeException("A MySQL StateStore cannot be correctly initialized due to:", rfoe);
     }
@@ -148,21 +149,21 @@ public class MysqlDagStateStore implements DagStateStore {
    * e.g. storeName = group1_name1, tableName = 1234 gives dagId group1_name1_1234
    */
   private String entryToDagId(String storeName, String tableName) {
-    return Joiner.on("_").join(storeName, tableName);
+    return Joiner.on(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER).join(storeName, tableName);
   }
 
   /**
    * Return a storeName given a dagId. Store name is defined as flowGroup_flowName.
    */
   private String getStoreNameFromDagId(String dagId) {
-    return dagId.substring(0, dagId.lastIndexOf('_'));
+    return dagId.substring(0, dagId.lastIndexOf(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER));
   }
 
   /**
    * Return a tableName given a dagId. Table name is defined as the flowExecutionId.
    */
   private String getTableNameFromDagId(String dagId) {
-    return dagId.substring(dagId.lastIndexOf('_') + 1);
+    return dagId.substring(dagId.lastIndexOf(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER) + 1);
   }
 
   /**
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index abd15aa..b53b76e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -50,6 +50,7 @@ import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
 import org.apache.gobblin.runtime.retention.DatasetCleanerTask;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -64,8 +65,6 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
   //We use table suffix that is different from the Gobblin job state store suffix of jst to avoid confusion.
   //gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
-  public static final String STATE_STORE_TABLE_SUFFIX = "gst";
-  public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
   public static final String GET_AND_SET_JOB_STATUS = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
       JOB_STATUS_MONITOR_PREFIX,  "getAndSetJobStatus");
 
@@ -218,7 +217,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   }
 
   public static String jobStatusTableName(String flowExecutionId, String jobGroup, String jobName) {
-    return Joiner.on(STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, STATE_STORE_TABLE_SUFFIX);
+    return Joiner.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, ServiceConfigKeys.STATE_STORE_TABLE_SUFFIX);
   }
 
   public static String jobStatusTableName(long flowExecutionId, String jobGroup, String jobName) {
@@ -226,11 +225,11 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
   }
 
   public static String jobStatusStoreName(String flowGroup, String flowName) {
-    return Joiner.on(STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+    return Joiner.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
   }
 
   public static long getExecutionIdFromTableName(String tableName) {
-    return Long.parseLong(Splitter.on(STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
+    return Long.parseLong(Splitter.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
   }
 
   public abstract org.apache.gobblin.configuration.State parseJobStatus(DecodeableKafkaRecord<byte[],byte[]> message);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 88e60f5..ac924fb 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -18,15 +18,24 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
 
+import com.google.common.base.Strings;
+
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
 
 
 public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
@@ -75,6 +84,50 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
     super.testGetLatestExecutionIdsForFlow();
   }
 
+  @Test
+  public void testMaxColumnName() throws Exception {
+    Properties properties = new Properties();
+    long flowExecutionId = 12340L;
+    String flowGroup = Strings.repeat("A", ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH);
+    String flowName = Strings.repeat("B", ServiceConfigKeys.MAX_FLOW_NAME_LENGTH);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
+    properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, Strings.repeat("C", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
+    properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
+    properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
+    State jobStatus = new State(properties);
+
+    KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore());
+    Iterator<JobStatus>
+        jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+    Assert.assertTrue(jobStatusIterator.hasNext());
+    Assert.assertEquals(jobStatusIterator.next().getFlowGroup(), flowGroup);
+  }
+
+  @Test
+  public void testInvalidColumnName() {
+    Properties properties = new Properties();
+    long flowExecutionId = 12340L;
+    String flowGroup = Strings.repeat("A", ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + 1);
+    String flowName = Strings.repeat("B", ServiceConfigKeys.MAX_FLOW_NAME_LENGTH);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
+    properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, Strings.repeat("C", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
+    properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
+    properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
+    State jobStatus = new State(properties);
+
+    try {
+      KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore());
+    } catch (IOException e) {
+      Assert.assertTrue(e.getCause().getMessage().contains("Data too long"));
+      return;
+    }
+    Assert.fail();
+  }
+
   @Override
   void cleanUpDir() throws Exception {
     this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP, FLOW_NAME));