You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/29 20:40:20 UTC
[gobblin] branch master updated: [GOBBLIN-1459] correct column length in MysqlSpecStore, MysqlDagStore, MysqlJobStatu…
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ee049d2 [GOBBLIN-1459] correct column length in MysqlSpecStore, MysqlDagStore, MysqlJobStatu…
ee049d2 is described below
commit ee049d20853dfaa2d0f727bc02351859260e75d7
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Jun 29 13:40:12 2021 -0700
[GOBBLIN-1459] correct column length in MysqlSpecStore, MysqlDagStore, MysqlJobStatu…
Closes #3298 from arjun4084346/columnLength
---
.../apache/gobblin/service/ServiceConfigKeys.java | 9 ++++
.../java/org/apache/gobblin/MysqlDagStore.java | 62 ++++++++++++++++++++++
.../metastore/MysqlDagStateStoreFactory.java | 48 +++++++++++++++++
.../metastore/MysqlJobStatusStateStore.java | 16 ++++++
.../apache/gobblin/metastore/MysqlStateStore.java | 6 ++-
.../apache/gobblin/service/FlowConfigV2Test.java | 30 +++++++++++
.../org/apache/gobblin/runtime/api/FlowSpec.java | 6 +++
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 20 ++++---
.../modules/orchestration/MysqlDagStateStore.java | 11 ++--
.../service/monitoring/KafkaJobStatusMonitor.java | 9 ++--
.../monitoring/MysqlJobStatusRetrieverTest.java | 53 ++++++++++++++++++
11 files changed, 251 insertions(+), 19 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 4a4c97b..5edc975 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -132,4 +132,13 @@ public class ServiceConfigKeys {
// Group Membership authentication service
public static final String GROUP_OWNERSHIP_SERVICE_CLASS = GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class";
public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE = "org.apache.gobblin.service.NoopGroupOwnershipService";
+
+ public static final int MAX_FLOW_NAME_LENGTH = 128; // defined in FlowId.pdl
+ public static final int MAX_FLOW_GROUP_LENGTH = 128; // defined in FlowId.pdl
+ public static final int MAX_JOB_NAME_LENGTH = 374;
+ public static final int MAX_JOB_GROUP_LENGTH = 374;
+ public static final String STATE_STORE_TABLE_SUFFIX = "gst";
+ public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
+ public static final String DAG_STORE_KEY_SEPARATION_CHARACTER = "_";
+
}
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java
new file mode 100644
index 0000000..e373577
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/MysqlDagStore.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin;
+
+import java.io.IOException;
+
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
+@Slf4j
+/**
+ * An implementation of {@link MysqlStateStore} backed by MySQL to store Dag.
+ *
+ * @param <T> state object type
+ **/
+public class MysqlDagStore<T extends State> extends MysqlStateStore<T> {
+ /**
+ * Manages the persistence and retrieval of {@link State} in a MySQL database
+ * @param dataSource the {@link DataSource} object for connecting to MySQL
+ * @param stateStoreTableName the table for storing the state in rows keyed by two levels (store_name, table_name)
+ * @param compressedValues should values be compressed for storage?
+ * @param stateClass class of the {@link State}s stored in this state store
+ * @throws IOException in case of failures
+ */
+ public MysqlDagStore(DataSource dataSource, String stateStoreTableName, boolean compressedValues,
+ Class<T> stateClass)
+ throws IOException {
+ super(dataSource, stateStoreTableName, compressedValues, stateClass);
+ }
+
+ @Override
+ protected String getCreateJobStateTableTemplate() {
+ int maxStoreName = ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length()
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
+ int maxTableName = 13; // length of flowExecutionId which is epoch timestamp
+
+ return "CREATE TABLE IF NOT EXISTS $TABLE$ (store_name varchar(" + maxStoreName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
+ + "table_name varchar(" + maxTableName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
+ + " modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,"
+ + " state longblob, primary key(store_name, table_name))";
+ }
+}
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDagStateStoreFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDagStateStoreFactory.java
new file mode 100644
index 0000000..5327c4b
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDagStateStoreFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import org.apache.commons.dbcp.BasicDataSource;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.MysqlDagStore;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlDagStateStoreFactory extends MysqlStateStoreFactory {
+ @Override
+ public <T extends State> MysqlDagStore<T> createStateStore(Config config, Class<T> stateClass) {
+ String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
+ ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+ boolean compressedValues = ConfigUtils.getBoolean(config, ConfigurationKeys.STATE_STORE_COMPRESSED_VALUES_KEY,
+ ConfigurationKeys.DEFAULT_STATE_STORE_COMPRESSED_VALUES);
+
+ try {
+ BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config,
+ SharedResourcesBrokerFactory.getImplicitBroker());
+
+ return new MysqlDagStore<>(basicDataSource, stateStoreTableName, compressedValues, stateClass);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create MysqlDagStore with factory", e);
+ }
+ }
+}
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
index aa73b37..210b3cb 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
@@ -36,6 +36,7 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
import org.apache.gobblin.metastore.predicates.StateStorePredicate;
import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
+import org.apache.gobblin.service.ServiceConfigKeys;
@Slf4j
@@ -71,6 +72,21 @@ public class MysqlJobStatusStateStore<T extends State> extends MysqlStateStore<T
}
@Override
+ protected String getCreateJobStateTableTemplate() {
+ int maxStoreName = ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length()
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
+ int maxTableName = 13 // length of flowExecutionId which is epoch timestamp
+ + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length() + ServiceConfigKeys.MAX_JOB_NAME_LENGTH
+ + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length() + ServiceConfigKeys.MAX_JOB_GROUP_LENGTH
+ + ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER.length() + ServiceConfigKeys.STATE_STORE_TABLE_SUFFIX.length();
+
+ return "CREATE TABLE IF NOT EXISTS $TABLE$ (store_name varchar(" + maxStoreName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
+ + "table_name varchar(" + maxTableName + ") CHARACTER SET latin1 COLLATE latin1_bin not null,"
+ + " modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,"
+ + " state longblob, primary key(store_name, table_name))";
+ }
+
+ @Override
public List<DatasetStateStoreEntryManager<T>> getMetadataForTables(StateStorePredicate predicate)
throws IOException {
List<DatasetStateStoreEntryManager<T>> entryManagers = Lists.newArrayList();
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index e472ad1..b96c877 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -162,7 +162,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
SELECT_METADATA_SQL = SELECT_METADATA_TEMPLATE.replace("$TABLE$", stateStoreTableName);
// create table if it does not exist
- String createJobTable = CREATE_JOB_STATE_TABLE_TEMPLATE.replace("$TABLE$", stateStoreTableName);
+ String createJobTable = getCreateJobStateTableTemplate().replace("$TABLE$", stateStoreTableName);
try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement = connection.prepareStatement(createJobTable)) {
createStatement.executeUpdate();
@@ -171,6 +171,10 @@ public class MysqlStateStore<T extends State> implements StateStore<T> {
}
}
+ protected String getCreateJobStateTableTemplate() {
+ return CREATE_JOB_STATE_TABLE_TEMPLATE;
+ }
+
/**
* creates a new {@link BasicDataSource}
* @param config the properties used for datasource instantiation
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index aa067c0..bf96f0a 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -345,6 +345,36 @@ public class FlowConfigV2Test {
}
}
+ @Test
+ public void testInvalidFlowId() throws Exception {
+ Map<String, String> flowProperties = Maps.newHashMap();
+ flowProperties.put("param1", "value1");
+ StringBuilder sb1 = new StringBuilder();
+ StringBuilder sb2 = new StringBuilder();
+ int maxFlowNameLength = ServiceConfigKeys.MAX_FLOW_NAME_LENGTH;
+ int maxFlowGroupLength = ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
+ while(maxFlowGroupLength-- >= 0) {
+ sb1.append("A");
+ }
+ while(maxFlowNameLength-- >= 0) {
+ sb2.append("A");
+ }
+ String TOO_LONG_FLOW_GROUP = sb1.toString();
+ String TOO_LONG_FLOW_NAME = sb2.toString();
+
+ FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TOO_LONG_FLOW_GROUP).setFlowName(TOO_LONG_FLOW_NAME))
+ .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
+ try {
+ _client.createFlowConfig(flowConfig);
+ } catch (RestLiResponseException e) {
+ Assert.assertEquals(e.getStatus(), HttpStatus.ORDINAL_422_Unprocessable_Entity);
+ Assert.assertTrue(e.getMessage().contains("is out of range"));
+ return;
+ }
+
+ Assert.fail();
+ }
+
@AfterClass(alwaysRun = true)
public void tearDown() throws Exception {
if (_client != null) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index a3d2fff..c66a531 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -47,6 +47,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.Schedule;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
@@ -467,5 +468,10 @@ public class FlowSpec implements Configurable, Spec {
.setFlowName(flowProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)))
.setProperties(flowPropsAsStringMap);
}
+
+ public static int maxFlowSpecUriLength() {
+ return URI_SCHEME.length() + ":".length() // URI separator
+ + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
+ }
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index ab644af..a9364d8 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -52,6 +52,7 @@ import org.apache.gobblin.runtime.api.SpecSearchObject;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecSerDeException;
import org.apache.gobblin.runtime.api.SpecStore;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
@@ -74,13 +75,6 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
public static final String DEFAULT_TAG_VALUE = "";
private static final String NEW_COLUMN = "spec_json";
- private static final String CREATE_TABLE_STATEMENT =
- "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL, flow_group VARCHAR(128), flow_name VARCHAR(128), "
- + "template_uri VARCHAR(128), user_to_proxy VARCHAR(128), source_identifier VARCHAR(128), "
- + "destination_identifier VARCHAR(128), schedule VARCHAR(128), tag VARCHAR(128) NOT NULL, "
- + "modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
- + "isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), "
- + "spec LONGBLOB, " + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE spec_uri = ?)";
protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, flow_group, flow_name, template_uri, "
+ "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, owning_group, spec, " + NEW_COLUMN + ") "
@@ -108,13 +102,23 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
this.specSerDe = specSerDe;
try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, this.tableName))) {
+ PreparedStatement statement = connection.prepareStatement(getCreateJobStateTableTemplate())) {
statement.executeUpdate();
} catch (SQLException e) {
throw new IOException(e);
}
}
+ protected String getCreateJobStateTableTemplate() {
+ return "CREATE TABLE IF NOT EXISTS " + this.tableName + " (spec_uri VARCHAR(" + FlowSpec.Utils.maxFlowSpecUriLength()
+ + ") NOT NULL, flow_group VARCHAR(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), flow_name VARCHAR("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), " + "template_uri VARCHAR(128), user_to_proxy VARCHAR(128), "
+ + "source_identifier VARCHAR(128), " + "destination_identifier VARCHAR(128), schedule VARCHAR(128), "
+ + "tag VARCHAR(128) NOT NULL, modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE "
+ + "CURRENT_TIMESTAMP, isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), spec LONGBLOB, "
+ + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
+ }
+
@Override
public boolean existsImpl(URI specUri) throws IOException {
try (Connection connection = this.dataSource.getConnection();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 95ff619..9f98a4b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -26,13 +26,14 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.MysqlDagStateStoreFactory;
import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.metastore.MysqlStateStoreEntryManager;
-import org.apache.gobblin.metastore.MysqlStateStoreFactory;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.predicates.StateStorePredicate;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
@@ -97,7 +98,7 @@ public class MysqlDagStateStore implements DagStateStore {
*/
protected StateStore<State> createStateStore(Config config) {
try {
- return (MysqlStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
+ return (MysqlDagStateStoreFactory.class.newInstance()).createStateStore(config, State.class);
} catch (ReflectiveOperationException rfoe) {
throw new RuntimeException("A MySQL StateStore cannot be correctly initialized due to:", rfoe);
}
@@ -148,21 +149,21 @@ public class MysqlDagStateStore implements DagStateStore {
* e.g. storeName = group1_name1, tableName = 1234 gives dagId group1_name1_1234
*/
private String entryToDagId(String storeName, String tableName) {
- return Joiner.on("_").join(storeName, tableName);
+ return Joiner.on(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER).join(storeName, tableName);
}
/**
* Return a storeName given a dagId. Store name is defined as flowGroup_flowName.
*/
private String getStoreNameFromDagId(String dagId) {
- return dagId.substring(0, dagId.lastIndexOf('_'));
+ return dagId.substring(0, dagId.lastIndexOf(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER));
}
/**
* Return a tableName given a dagId. Table name is defined as the flowExecutionId.
*/
private String getTableNameFromDagId(String dagId) {
- return dagId.substring(dagId.lastIndexOf('_') + 1);
+ return dagId.substring(dagId.lastIndexOf(ServiceConfigKeys.DAG_STORE_KEY_SEPARATION_CHARACTER) + 1);
}
/**
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index abd15aa..b53b76e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -50,6 +50,7 @@ import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.runtime.retention.DatasetCleanerTask;
import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ConfigUtils;
@@ -64,8 +65,6 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
//We use table suffix that is different from the Gobblin job state store suffix of jst to avoid confusion.
//gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
- public static final String STATE_STORE_TABLE_SUFFIX = "gst";
- public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
public static final String GET_AND_SET_JOB_STATUS = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
JOB_STATUS_MONITOR_PREFIX, "getAndSetJobStatus");
@@ -218,7 +217,7 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
}
public static String jobStatusTableName(String flowExecutionId, String jobGroup, String jobName) {
- return Joiner.on(STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, STATE_STORE_TABLE_SUFFIX);
+ return Joiner.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, ServiceConfigKeys.STATE_STORE_TABLE_SUFFIX);
}
public static String jobStatusTableName(long flowExecutionId, String jobGroup, String jobName) {
@@ -226,11 +225,11 @@ public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], by
}
public static String jobStatusStoreName(String flowGroup, String flowName) {
- return Joiner.on(STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+ return Joiner.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
}
public static long getExecutionIdFromTableName(String tableName) {
- return Long.parseLong(Splitter.on(STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
+ return Long.parseLong(Splitter.on(ServiceConfigKeys.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
}
public abstract org.apache.gobblin.configuration.State parseJobStatus(DecodeableKafkaRecord<byte[],byte[]> message);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index 88e60f5..ac924fb 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -18,15 +18,24 @@
package org.apache.gobblin.service.monitoring;
import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+import com.google.common.base.Strings;
+
+import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
@@ -75,6 +84,50 @@ public class MysqlJobStatusRetrieverTest extends JobStatusRetrieverTest {
super.testGetLatestExecutionIdsForFlow();
}
+ @Test
+ public void testMaxColumnName() throws Exception {
+ Properties properties = new Properties();
+ long flowExecutionId = 12340L;
+ String flowGroup = Strings.repeat("A", ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH);
+ String flowName = Strings.repeat("B", ServiceConfigKeys.MAX_FLOW_NAME_LENGTH);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, Strings.repeat("C", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
+ properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
+ State jobStatus = new State(properties);
+
+ KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore());
+ Iterator<JobStatus>
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+ Assert.assertTrue(jobStatusIterator.hasNext());
+ Assert.assertEquals(jobStatusIterator.next().getFlowGroup(), flowGroup);
+ }
+
+ @Test
+ public void testInvalidColumnName() {
+ Properties properties = new Properties();
+ long flowExecutionId = 12340L;
+ String flowGroup = Strings.repeat("A", ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + 1);
+ String flowName = Strings.repeat("B", ServiceConfigKeys.MAX_FLOW_NAME_LENGTH);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, Strings.repeat("C", ServiceConfigKeys.MAX_JOB_NAME_LENGTH));
+ properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, Strings.repeat("D", ServiceConfigKeys.MAX_JOB_GROUP_LENGTH));
+ State jobStatus = new State(properties);
+
+ try {
+ KafkaJobStatusMonitor.addJobStatusToStateStore(jobStatus, this.jobStatusRetriever.getStateStore());
+ } catch (IOException e) {
+ Assert.assertTrue(e.getCause().getMessage().contains("Data too long"));
+ return;
+ }
+ Assert.fail();
+ }
+
@Override
void cleanUpDir() throws Exception {
this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP, FLOW_NAME));