You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/02/27 21:26:43 UTC
[gobblin] branch master updated: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow (#3640)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new af2a9c196 [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow (#3640)
af2a9c196 is described below
commit af2a9c196d74b96d51dd8ee57b87cba0baceb9a5
Author: umustafi <um...@gmail.com>
AuthorDate: Mon Feb 27 13:26:35 2023 -0800
[GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow (#3640)
* wip
* Implementation of batched get
* add metrics to measure scheduler initialization and get spec time
* implement methods for FSSpecStore
* remove comment
* refactor to consolidate pagination methods in one, fix tests
* fix service tests
* fix tests and specification for pagination to include nonneg value
* Added test cases for edge cases of pagination, ensure consumer start before scheduler
* Check for newest spec add to scheduler
* Properly add new modified_time prop to Config using setProperty
* handle modification time
* fix tests
* handle edge case, small fixes
* avoid memory leak for last updated time
---------
Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +
.../service/FlowConfigResourceLocalHandler.java | 2 +-
.../org/apache/gobblin/runtime/api/FlowSpec.java | 3 +
.../gobblin/runtime/api/FlowSpecSearchObject.java | 2 +-
.../gobblin/runtime/api/InstrumentedSpecStore.java | 6 +-
.../org/apache/gobblin/runtime/api/SpecStore.java | 12 +-
.../gobblin/runtime/metrics/RuntimeMetrics.java | 3 +
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 10 +-
.../gobblin/runtime/spec_store/FSSpecStore.java | 33 ++++-
.../runtime/spec_store/MysqlBaseSpecStore.java | 69 ++++++---
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 36 ++++-
.../runtime/spec_store/MysqlSpecStoreTest.java | 37 +++--
.../spec_store/MysqlSpecStoreWithUpdateTest.java | 21 +--
.../modules/core/GobblinServiceManager.java | 12 ++
.../scheduler/GobblinServiceJobScheduler.java | 165 ++++++++++++++++-----
15 files changed, 318 insertions(+), 95 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 362016cdd..fe62d5eec 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -89,6 +89,8 @@ public class ConfigurationKeys {
// Job retriggering
public static final String JOB_RETRIGGERING_ENABLED = "job.retriggering.enabled";
public static final String DEFAULT_JOB_RETRIGGERING_ENABLED = "true";
+ public static final String LOAD_SPEC_BATCH_SIZE = "load.spec.batch.size";
+ public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 100;
// Job executor thread pool size
public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = "jobexecutor.threadpool.size";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 5c8dc8c41..f97b1b915 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -116,7 +116,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
* Get all flow configs in between start and start + count - 1
*/
public Collection<FlowConfig> getAllFlowConfigs(int start, int count) {
- return flowCatalog.getAllSpecs(start, count).stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
+ return flowCatalog.getSpecsPaginated(start, count).stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
}
/**
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 1e007c7c6..85e7a269b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -65,6 +65,9 @@ import org.apache.gobblin.util.ConfigUtils;
@SuppressFBWarnings(value="SE_BAD_FIELD",
justification = "FindBugs complains about Config not being serializable, but the implementation of Config is serializable")
public class FlowSpec implements Configurable, Spec {
+ // Key for Property associated with modified_time
+ public static final String MODIFICATION_TIME_KEY = "modified_time";
+
private static final long serialVersionUID = -5511988862945107734L;
/** An URI identifying the flow. */
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
index dc5d6447e..226cd6cdf 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
@@ -121,7 +121,7 @@ public class FlowSpecSearchObject implements SpecSearchObject {
if (this.getCount() > 0) {
// Order by two fields to make a full order by
- limitAndOffset.add(" ORDER BY modified_time DESC, spec_uri ASC LIMIT ?");
+ limitAndOffset.add(" ORDER BY spec_uri ASC LIMIT ?");
if (this.getStart() > 0) {
limitAndOffset.add(" OFFSET ?");
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index d2dfd7eb8..d94547989 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -171,8 +171,8 @@ public abstract class InstrumentedSpecStore implements SpecStore {
}
@Override
- public Collection<Spec> getSpecs(int start, int count) throws IOException {
- return this.getTimer.invokeMayThrowIO(() -> getSpecsImpl(start, count));
+ public Collection<Spec> getSpecsPaginated(int startOffset, int batchSize) throws IOException, IllegalArgumentException {
+ return this.getTimer.invokeMayThrowIO(() -> getSpecsPaginatedImpl(startOffset, batchSize));
}
@Override
@@ -193,7 +193,7 @@ public abstract class InstrumentedSpecStore implements SpecStore {
public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
public abstract Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException;
public abstract int getSizeImpl() throws IOException;
- public abstract Collection<Spec> getSpecsImpl(int start, int count) throws IOException;
+ public abstract Collection<Spec> getSpecsPaginatedImpl(int startOffset, int batchSize) throws IOException, IllegalArgumentException;
/** child classes can implement this if they want to get specs using {@link SpecSearchObject} */
public Collection<Spec> getSpecsImpl(SpecSearchObject specUri) throws IOException {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index 072ce2195..d2cc41e6c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -127,10 +127,14 @@ public interface SpecStore {
Collection<Spec> getSpecs() throws IOException;
/***
- * Get all {@link Spec}s from the {@link SpecStore} with pagination input.
- * @throws IOException Exception in retrieving {@link Spec}s.
- */
- Collection<Spec> getSpecs(int start, int count) throws IOException;
+ * Retrieve a batch of {@link Spec}s of at most size batchSize beginning at startOffset after creating a unique
+ * ordering of the specs based on primary key spec_uri.
+ * @param startOffset starting row to batch the specs returned from, startOffset >= 0
+ * @param batchSize max number of specs returned in the batch, batchSize >= 0
+ * @throws IOException
+ * @throws IllegalArgumentException in retrieving the {@link Spec} or if startOffset < 0, batchSize < 0
+ */
+ Collection<Spec> getSpecsPaginated(int startOffset, int batchSize) throws IOException, IllegalArgumentException;
/**
* Return an iterator of Spec URIs(Spec identifiers)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 1b7bb3ae5..b24ef000d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -54,6 +54,9 @@ public class RuntimeMetrics {
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
+ public static final String GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.getSpecSpeedDuringStartupAvgMillis";
+ public static final String GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.loadSpecBatchSize";
+ public static final String GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToInitializeSchedulerMillis";
// Metadata keys
public static final String TOPIC = "topic";
public static final String GROUP_ID = "groupId";
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 2ab61e14d..19be25c51 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -317,18 +317,18 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
/**
- * A function to get all specs in the {@link SpecStore} between the provided start index and (start + count - 1) index, inclusive.
+ * A function to get a batch of specs in the {@link SpecStore} between the provided start index and (start + count - 1) index, inclusive.
* This enables pagination so getting SpecStore object will not timeout, and can be tuned to how many results is desired at any one time.
- * The {@link Spec} in {@link SpecStore} are sorted in descending order of the modified_time while paginating.
+ * The {@link Spec} in {@link SpecStore} are sorted in ascending order of the spec_uri while paginating.
*
* @param start The start index.
* @param count The total number of records to get.
* @return A collection of specs between start and start + count - 1, inclusive.
*/
- public Collection<Spec> getAllSpecs(int start, int count) {
+ public Collection<Spec> getSpecsPaginated(int start, int count) {
try {
- return specStore.getSpecs(start, count);
- } catch (IOException e) {
+ return specStore.getSpecsPaginated(start, count);
+ } catch (IOException | IllegalArgumentException e) {
throw new RuntimeException("Cannot retrieve specs from Spec stores between " + start + " and " + (start + count - 1), e);
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 6f3bbe024..422888bcd 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -19,8 +19,10 @@ package org.apache.gobblin.runtime.spec_store;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -358,8 +360,35 @@ public class FSSpecStore extends InstrumentedSpecStore {
}
@Override
- public Collection<Spec> getSpecsImpl(int start, int count) throws UnsupportedOperationException {
- throw new UnsupportedOperationException();
+ public Collection<Spec> getSpecsPaginatedImpl(int startOffset, int batchSize)
+ throws IOException {
+ if (startOffset < 0 || batchSize < 0) {
+ throw new IllegalArgumentException(String.format("Received negative offset or batch size value when they should be >= 0. "
+ + "Offset is %s and batch size is %s", startOffset, batchSize));
+ }
+ // Obtain sorted list of spec uris to paginate from
+ Iterator<URI> uriIterator = getSpecURIsImpl();
+ List<URI> sortedUris = new ArrayList<>();
+ while (uriIterator.hasNext()) {
+ sortedUris.add(uriIterator.next());
+ }
+ sortedUris.sort(URI::compareTo);
+
+ int numElements = 0;
+ List<Spec> batchOfSpecs = new ArrayList<>();
+ URI currentURI;
+
+ while (startOffset + numElements < sortedUris.size() && numElements < batchSize) {
+ currentURI = sortedUris.get(startOffset + numElements);
+ try {
+ batchOfSpecs.add(getSpecImpl(currentURI));
+ } catch (SpecNotFoundException e) {
+ log.warn("Unable to find spec for uri {} so proceeding to next URI. Stacktrace {}", currentURI, e);
+ continue;
+ }
+ numElements += 1;
+ }
+ return batchOfSpecs;
}
private int getSizeImpl(Path directory) throws IOException {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
index b12360466..713c2b9d8 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -28,7 +28,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import javax.sql.DataSource;
+import java.util.Properties;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
@@ -36,6 +36,7 @@ import com.google.common.io.ByteStreams;
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
+import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
@@ -81,9 +82,10 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
protected static final String UPDATE_STATEMENT = "UPDATE %s SET spec=?,spec_json=? WHERE spec_uri=? AND UNIX_TIMESTAMP(modified_time) < ?";
private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
private static final String GET_STATEMENT_BASE = "SELECT spec_uri, spec FROM %s WHERE ";
- private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM %s";
+ private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, modified_time FROM %s";
private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
private static final String GET_ALL_URIS_WITH_TAG_STATEMENT = "SELECT spec_uri FROM %s WHERE tag = ?";
+ private static final String GET_SPECS_BATCH_STATEMENT = "SELECT spec_uri, spec, modified_time FROM %s ORDER BY spec_uri ASC LIMIT ? OFFSET ?";
private static final String GET_SIZE_STATEMENT = "SELECT COUNT(*) FROM %s ";
// NOTE: using max length of a `FlowSpec` URI, as it's believed to be the longest of existing `Spec` types
private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(" + FlowSpec.Utils.maxFlowSpecUriLength()
@@ -103,6 +105,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
public final String getAllStatement = String.format(getTablelessGetAllStatement(), MysqlBaseSpecStore.this.tableName);
public final String getAllURIsStatement = String.format(getTablelessGetAllURIsStatement(), MysqlBaseSpecStore.this.tableName);
public final String getAllURIsWithTagStatement = String.format(getTablelessGetAllURIsWithTagStatement(), MysqlBaseSpecStore.this.tableName);
+ public final String getBatchStatement = String.format(getTablelessGetBatchStatement(), MysqlBaseSpecStore.this.tableName);
public final String getSizeStatement = String.format(getTablelessGetSizeStatement(), MysqlBaseSpecStore.this.tableName);
public final String createTableStatement = String.format(getTablelessCreateTableStatement(), MysqlBaseSpecStore.this.tableName);
@@ -119,6 +122,26 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
return MysqlBaseSpecStore.this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()));
}
+ public Spec extractSpecWithModificationTime(ResultSet rs) throws SQLException, IOException {
+ Spec spec = MysqlBaseSpecStore.this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()));
+ // Set modified timestamp in flowSpec properties list
+ if (spec instanceof FlowSpec) {
+ long timestamp = rs.getTimestamp(FlowSpec.MODIFICATION_TIME_KEY).getTime();
+ FlowSpec flowSpec = (FlowSpec) spec;
+ Properties properties = flowSpec.getConfigAsProperties();
+ properties.setProperty(FlowSpec.MODIFICATION_TIME_KEY, String.valueOf(timestamp));
+ return flowSpec;
+ }
+ return spec;
+ }
+
+ public void completeGetBatchStatement(PreparedStatement statement, int startOffset, int batchSize)
+ throws SQLException {
+ int i = 0;
+ statement.setInt(++i, batchSize);
+ statement.setInt(++i, startOffset);
+ }
+
protected String getTablelessExistsStatement() { return MysqlBaseSpecStore.EXISTS_STATEMENT; }
protected String getTablelessUpdateStatement() { return MysqlBaseSpecStore.UPDATE_STATEMENT; }
protected String getTablelessInsertStatement() { return MysqlBaseSpecStore.INSERT_STATEMENT; }
@@ -127,6 +150,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
protected String getTablelessGetAllStatement() { return MysqlBaseSpecStore.GET_ALL_STATEMENT; }
protected String getTablelessGetAllURIsStatement() { return MysqlBaseSpecStore.GET_ALL_URIS_STATEMENT; }
protected String getTablelessGetAllURIsWithTagStatement() { return MysqlBaseSpecStore.GET_ALL_URIS_WITH_TAG_STATEMENT; }
+ protected String getTablelessGetBatchStatement() {return MysqlBaseSpecStore.GET_SPECS_BATCH_STATEMENT; }
protected String getTablelessGetSizeStatement() { return MysqlBaseSpecStore.GET_SIZE_STATEMENT; }
protected String getTablelessCreateTableStatement() { return MysqlBaseSpecStore.CREATE_TABLE_STATEMENT; }
}
@@ -221,7 +245,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
public Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException {
Iterator<Spec> resultSpecs = withPreparedStatement(this.sqlStatements.getAllStatement + " WHERE spec_uri = ?", statement -> {
statement.setString(1, specUri.toString());
- return retrieveSpecs(statement).iterator();
+ return retrieveSpecsWithModificationTime(statement).iterator();
});
if (resultSpecs.hasNext()) {
return resultSpecs.next();
@@ -260,10 +284,23 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
return specs;
}
+ protected final Collection<Spec> retrieveSpecsWithModificationTime(PreparedStatement statement) throws IOException {
+ List<Spec> specs = new ArrayList<>();
+ try (ResultSet rs = statement.executeQuery()) {
+ while (rs.next()) {
+ specs.add(this.sqlStatements.extractSpecWithModificationTime(rs));
+ }
+ } catch (SQLException | SpecSerDeException e) {
+ log.error("Failed to deserialize spec", e);
+ throw new IOException(e);
+ }
+ return specs;
+ }
+
@Override
public Iterator<URI> getSpecURIsImpl() throws IOException {
return withPreparedStatement(this.sqlStatements.getAllURIsStatement, statement -> {
- return retreiveURIs(statement).iterator();
+ return retrieveURIs(statement).iterator();
});
}
@@ -278,20 +315,14 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
}
@Override
- public Collection<Spec> getSpecsImpl(int start, int count) throws IOException {
- List<String> limitAndOffset = new ArrayList<>();
- if (count > 0) {
- // Order by two fields to make a full order by
- limitAndOffset.add(" ORDER BY modified_time DESC, spec_uri ASC LIMIT");
- limitAndOffset.add(String.valueOf(count));
- if (start > 0) {
- limitAndOffset.add("OFFSET");
- limitAndOffset.add(String.valueOf(start));
- }
+ public Collection<Spec> getSpecsPaginatedImpl(int startOffset, int batchSize) throws IOException, IllegalArgumentException {
+ if (startOffset < 0 || batchSize < 0) {
+ throw new IllegalArgumentException(String.format("Received negative offset or batch size value when they should be >= 0. "
+ + "Offset is %s and batch size is %s", startOffset, batchSize));
}
- String finalizedStatement = this.sqlStatements.getAllStatement + String.join(" ", limitAndOffset);
- return withPreparedStatement(finalizedStatement, statement -> {
- return retrieveSpecs(statement);
+ return withPreparedStatement(this.sqlStatements.getBatchStatement, statement -> {
+ this.sqlStatements.completeGetBatchStatement(statement, startOffset, batchSize);
+ return retrieveSpecsWithModificationTime(statement);
});
}
@@ -299,11 +330,11 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
public Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException {
return withPreparedStatement(this.sqlStatements.getAllURIsWithTagStatement, statement -> {
statement.setString(1, tag);
- return retreiveURIs(statement).iterator();
+ return retrieveURIs(statement).iterator();
});
}
- private List<URI> retreiveURIs(PreparedStatement statement) throws SQLException {
+ private List<URI> retrieveURIs(PreparedStatement statement) throws SQLException {
List<URI> uris = new ArrayList<>();
try (ResultSet rs = statement.executeQuery()) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 00e203da1..3e9cd4351 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -17,10 +17,6 @@
package org.apache.gobblin.runtime.spec_store;
-import com.google.common.base.Charsets;
-import com.google.common.io.ByteStreams;
-import com.google.gson.Gson;
-import com.typesafe.config.Config;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
@@ -28,7 +24,15 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.Properties;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.ByteStreams;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -38,7 +42,8 @@ import org.apache.gobblin.runtime.api.SpecStore;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
-import static org.apache.gobblin.service.ServiceConfigKeys.*;
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY;
/**
@@ -60,7 +65,8 @@ public class MysqlSpecStore extends MysqlBaseSpecStore {
+ "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, owning_group, spec, spec_json) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), spec_json = VALUES(spec_json)";
private static final String SPECIFIC_GET_STATEMENT_BASE = "SELECT spec_uri, spec, spec_json FROM %s WHERE ";
- private static final String SPECIFIC_GET_ALL_STATEMENT = "SELECT spec_uri, spec, spec_json FROM %s";
+ private static final String SPECIFIC_GET_ALL_STATEMENT = "SELECT spec_uri, spec, spec_json, modified_time FROM %s";
+ private static final String SPECIFIC_GET_SPECS_BATCH_STATEMENT = "SELECT spec_uri, spec, spec_json, modified_time FROM %s ORDER BY spec_uri ASC LIMIT ? OFFSET ?";
private static final String SPECIFIC_CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR("
+ FlowSpec.Utils.maxFlowSpecUriLength()
+ ") NOT NULL, flow_group VARCHAR(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), flow_name VARCHAR("
@@ -110,6 +116,22 @@ public class MysqlSpecStore extends MysqlBaseSpecStore {
: MysqlSpecStore.this.specSerDe.deserialize(rs.getString(3).getBytes(Charsets.UTF_8));
}
+ @Override
+ public Spec extractSpecWithModificationTime(ResultSet rs) throws SQLException, IOException {
+ Spec spec = rs.getString(3) == null
+ ? MysqlSpecStore.this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()))
+ : MysqlSpecStore.this.specSerDe.deserialize(rs.getString(3).getBytes(Charsets.UTF_8));
+ // Set modified timestamp in flowSpec properties list
+ if (spec instanceof FlowSpec) {
+ long timestamp = rs.getTimestamp(FlowSpec.MODIFICATION_TIME_KEY).getTime();
+ FlowSpec flowSpec = (FlowSpec) spec;
+ Properties properties = flowSpec.getConfigAsProperties();
+ properties.setProperty(FlowSpec.MODIFICATION_TIME_KEY, String.valueOf(timestamp));
+ return flowSpec;
+ }
+ return spec;
+ }
+
@Override
protected String getTablelessInsertStatement() { return MysqlSpecStore.SPECIFIC_INSERT_STATEMENT; }
@Override
@@ -117,6 +139,8 @@ public class MysqlSpecStore extends MysqlBaseSpecStore {
@Override
protected String getTablelessGetAllStatement() { return MysqlSpecStore.SPECIFIC_GET_ALL_STATEMENT; }
@Override
+ protected String getTablelessGetBatchStatement() { return MysqlSpecStore.SPECIFIC_GET_SPECS_BATCH_STATEMENT; }
+ @Override
protected String getTablelessCreateTableStatement() { return MysqlSpecStore.SPECIFIC_CREATE_TABLE_STATEMENT; }
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 90659d333..73b2e052a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -153,6 +153,7 @@ public class MysqlSpecStoreTest {
@Test (dependsOnMethods = "testAddSpec")
public void testGetSpec() throws Exception {
FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
+ removeModificationTimestampFromSpecs(result);
Assert.assertEquals(result, this.flowSpec1);
Collection<Spec> specs = this.specStore.getSpecs();
@@ -283,13 +284,24 @@ public class MysqlSpecStoreTest {
Assert.assertFalse(specs.contains(this.flowSpec4));
}
+ /**
+ * This method is used for testing to remove the `modification_timestamp` key from the FlowSpec config field for
+ * only for testing purposes because the equality check between FlowSpec created for testing and retrieved from the
+ * store will not be equal. In practice, we would never encounter this issue as we only compare specs obtained from
+ * the store with the key mentioned.
+ */
+ public static void removeModificationTimestampFromSpecs(Spec spec) {
+ ((FlowSpec) spec).getConfigAsProperties().remove(FlowSpec.MODIFICATION_TIME_KEY);
+ }
+
@Test (dependsOnMethods = "testGetSpec")
public void testGetAllSpecPaginate() throws Exception {
/**
* Sorted order of the specStore configurations is flowSpec1, flowSpec2, flowSpec4
*/
// Return all flowSpecs from index 0 to 9. Total of 3 flowSpecs only so return all 3 flowSpecs
- Collection<Spec> specs = this.specStore.getSpecs(0,10);
+ Collection<Spec> specs = this.specStore.getSpecsPaginated(0,10);
+ specs.forEach(spec -> removeModificationTimestampFromSpecs(spec));
Assert.assertEquals(specs.size(), 3);
Assert.assertTrue(specs.contains(this.flowSpec1));
Assert.assertTrue(specs.contains(this.flowSpec2));
@@ -297,25 +309,31 @@ public class MysqlSpecStoreTest {
// Return all flowSpecs using the default get all specs function. Testing default functionality of returning everything
specs = this.specStore.getSpecs();
+ specs.forEach(spec -> removeModificationTimestampFromSpecs(spec));
Assert.assertEquals(specs.size(), 3);
Assert.assertTrue(specs.contains(this.flowSpec1));
Assert.assertTrue(specs.contains(this.flowSpec2));
Assert.assertTrue(specs.contains(this.flowSpec4));
// Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only return first two.
- specs = this.specStore.getSpecs(0,2);
+ specs = this.specStore.getSpecsPaginated(0,2);
+ specs.forEach(spec -> removeModificationTimestampFromSpecs(spec));
Assert.assertEquals(specs.size(), 2);
Assert.assertTrue(specs.contains(this.flowSpec1));
Assert.assertTrue(specs.contains(this.flowSpec2));
Assert.assertFalse(specs.contains(this.flowSpec4));
- // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only return first two.
- // Check that functionality for not including a start value is the same as including start value of 0
- specs = this.specStore.getSpecs(-1, 2);
- Assert.assertEquals(specs.size(), 2);
- Assert.assertTrue(specs.contains(this.flowSpec1));
- Assert.assertTrue(specs.contains(this.flowSpec2));
- Assert.assertFalse(specs.contains(this.flowSpec4));
+ // Return 0 flowSpecs when batch size is 0.
+ specs = this.specStore.getSpecsPaginated(2,0);
+ Assert.assertEquals(specs.size(), 0);
+
+ // Return 0 flowSpecs when start offset is past the end
+ specs = this.specStore.getSpecsPaginated(3,1);
+ Assert.assertEquals(specs.size(), 0);
+
+ // Check that we throw an error for incorrect inputs
+ Assert.assertThrows(IllegalArgumentException.class, () -> this.specStore.getSpecsPaginated(-1, 2));
+ Assert.assertThrows(IllegalArgumentException.class, () -> this.specStore.getSpecsPaginated(2, -4));
}
@Test (expectedExceptions = {IOException.class})
@@ -336,6 +354,7 @@ public class MysqlSpecStoreTest {
this.oldSpecStore.addSpec(this.flowSpec1);
FlowSpec spec = (FlowSpec) this.specStore.getSpec(this.uri1);
+ removeModificationTimestampFromSpecs(spec);
Assert.assertEquals(spec, this.flowSpec1);
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
index 621fa3771..bcd77eaab 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
@@ -164,6 +164,7 @@ public class MysqlSpecStoreWithUpdateTest {
@Test (dependsOnMethods = "testAddSpec")
public void testGetSpec() throws Exception {
FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
+ MysqlSpecStoreTest.removeModificationTimestampFromSpecs(result);
Assert.assertEquals(result, this.flowSpec1);
Collection<Spec> specs = this.specStore.getSpecs();
@@ -298,7 +299,9 @@ public class MysqlSpecStoreWithUpdateTest {
public void testUpdate() throws Exception{
long version = System.currentTimeMillis() /1000;
this.specStore.updateSpec(this.flowSpec4_update);
- Assert.assertEquals(((FlowSpec) this.specStore.getSpec(this.uri4)), flowSpec4_update);
+ FlowSpec spec = (FlowSpec) this.specStore.getSpec(this.uri4);
+ MysqlSpecStoreTest.removeModificationTimestampFromSpecs(spec);
+ Assert.assertEquals(spec, flowSpec4_update);
Assert.expectThrows(IOException.class, () -> this.specStore.updateSpec(flowSpec4, version));
}
@@ -308,7 +311,8 @@ public class MysqlSpecStoreWithUpdateTest {
* Sorted order of the specStore configurations is flowSpec1, flowSpec2, flowSpec4
*/
// Return all flowSpecs from index 0 to 9. Total of 3 flowSpecs only so return all 3 flowSpecs
- Collection<Spec> specs = this.specStore.getSpecs(0,10);
+ Collection<Spec> specs = this.specStore.getSpecsPaginated(0,10);
+ specs.forEach(spec -> MysqlSpecStoreTest.removeModificationTimestampFromSpecs(spec));
for (Spec spec: specs) {
System.out.println("test" + spec.getUri());
}
@@ -325,19 +329,15 @@ public class MysqlSpecStoreWithUpdateTest {
Assert.assertTrue(specs.contains(this.flowSpec4));
// Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return first two.
- specs = this.specStore.getSpecs(0,2);
+ specs = this.specStore.getSpecsPaginated(0,2);
+ specs.forEach(spec -> MysqlSpecStoreTest.removeModificationTimestampFromSpecs(spec));
Assert.assertEquals(specs.size(), 2);
Assert.assertTrue(specs.contains(this.flowSpec1));
Assert.assertTrue(specs.contains(this.flowSpec2));
Assert.assertFalse(specs.contains(this.flowSpec4));
- // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return first two.
- // Check that functionality for not including a start value is the same as including start value of 0
- specs = this.specStore.getSpecs(-1, 2);
- Assert.assertEquals(specs.size(), 2);
- Assert.assertTrue(specs.contains(this.flowSpec1));
- Assert.assertTrue(specs.contains(this.flowSpec2));
- Assert.assertFalse(specs.contains(this.flowSpec4));
+ // Check that we throw an error for incorrect inputs
+ Assert.assertThrows(IllegalArgumentException.class, () -> this.specStore.getSpecsPaginated(-1, -4));
}
@Test (expectedExceptions = {IOException.class})
@@ -358,6 +358,7 @@ public class MysqlSpecStoreWithUpdateTest {
this.oldSpecStore.addSpec(this.flowSpec1);
FlowSpec spec = (FlowSpec) this.specStore.getSpec(this.uri1);
+ MysqlSpecStoreTest.removeModificationTimestampFromSpecs(spec);
Assert.assertEquals(spec, this.flowSpec1);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 8e113909e..b8513f5a8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -439,6 +439,18 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
this.eventBus.register(this);
this.serviceLauncher.start();
+ // Wait until spec consumer service is running to set scheduler to active
+ if (this.configuration.isWarmStandbyEnabled()) {
+ while (!this.specStoreChangeMonitor.isRunning()) {
+ try {
+ LOGGER.info("Waiting for SpecStoreChangeMonitor to be started...");
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while waiting for SpecStoreChangeMonitor to be started");
+ }
+ }
+ }
+
if (this.helixManager.isPresent()) {
// Subscribe to leadership changes
this.helixManager.get().addControllerListener((ControllerChangeListener) this::handleLeadershipChange);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 9c365e54f..54b9a6a26 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -17,25 +17,43 @@
package org.apache.gobblin.service.modules.scheduler;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.helix.HelixManager;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerException;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricFilter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
@@ -44,9 +62,11 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.scheduler.BaseGobblinJob;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
@@ -56,20 +76,9 @@ import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
-import org.apache.helix.HelixManager;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerException;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
@@ -97,12 +106,20 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
@Getter
protected final Map<String, Spec> scheduledFlowSpecs;
@Getter
+ protected final Map<String, Long> lastUpdatedTimeForFlowSpec;
+ protected volatile int loadSpecsBatchSize = -1;
+ @Getter
private volatile boolean isActive;
private String serviceName;
+ private volatile Long averageGetSpecTimeValue = -1L;
+ private volatile Long timeToInitializeSchedulerValue = -1L;
+ private final ContextAwareGauge averageGetSpecTimeMillis = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS, () -> this.averageGetSpecTimeValue);;
+ private final ContextAwareGauge batchSize = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE, () -> this.loadSpecsBatchSize);
+ private final ContextAwareGauge timeToInitalizeSchedulerMillis = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_MILLIS, () -> this.timeToInitializeSchedulerValue);
private static final MetricContext metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(),
GobblinServiceJobScheduler.class);
private static final ContextAwareMeter scheduledFlows = metricContext.contextAwareMeter(ServiceMetricNames.SCHEDULED_FLOW_METER);
- private static final ContextAwareMeter nonScheduledFlows = metricContext.contextAwareMeter(ServiceMetricNames.NON_SCHEDULED_FLOW_METER);;
+ private static final ContextAwareMeter nonScheduledFlows = metricContext.contextAwareMeter(ServiceMetricNames.NON_SCHEDULED_FLOW_METER);
/**
* If current instances is nominated as a handler for DR traffic from down GaaS-Instance.
@@ -133,10 +150,20 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
this.helixManager = helixManager;
this.orchestrator = orchestrator;
this.scheduledFlowSpecs = Maps.newHashMap();
+ this.lastUpdatedTimeForFlowSpec = Maps.newHashMap();
+ this.loadSpecsBatchSize = Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.LOAD_SPEC_BATCH_SIZE, String.valueOf(ConfigurationKeys.DEFAULT_LOAD_SPEC_BATCH_SIZE)));
this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
&& config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
this.warmStandbyEnabled = warmStandbyEnabled;
this.quotaManager = quotaManager;
+ // Check that these metrics do not exist before adding, mainly for testing purpose which creates multiple instances
+ // of the scheduler. If one metric exists, then the others should as well.
+ MetricFilter filter = MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS);
+ if (metricContext.getGauges(filter).isEmpty()) {
+ metricContext.register(this.averageGetSpecTimeMillis);
+ metricContext.register(this.batchSize);
+ metricContext.register(timeToInitalizeSchedulerMillis);
+ }
}
public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator,
@@ -189,6 +216,19 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
}
}
+ /** Helps modify spec before adding to scheduler for adhoc flows */
+ private void addSpecHelperMethod(Spec spec) {
+ // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
+ if (spec instanceof FlowSpec && PropertiesUtils
+ .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+ "false")) {
+ Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+ onAddSpec(modifiedSpec);
+ } else {
+ onAddSpec(spec);
+ }
+ }
+
/**
* Load all {@link FlowSpec}s from {@link FlowCatalog} as one of the initialization step,
* and make schedulers be aware of that.
@@ -196,12 +236,20 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
* If it is newly brought up as the DR handler, will load additional FlowSpecs and handle transition properly.
*/
private void scheduleSpecsFromCatalog() {
- Iterator<URI> specUris = null;
+ int numSpecs = this.flowCatalog.get().getSize();
long startTime = System.currentTimeMillis();
-
+ Iterator<URI> uriIterator;
+ HashSet<URI> urisLeftToSchedule = new HashSet<>();
try {
- specUris = this.flowCatalog.get().getSpecURIs();
+ uriIterator = this.flowCatalog.get().getSpecURIs();
+ while (uriIterator.hasNext()) {
+ urisLeftToSchedule.add(uriIterator.next());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ try {
// If current instances nominated as DR handler, will take additional URIS from FlowCatalog.
if (isNominatedDRHandler) {
// Synchronously cleaning the execution state for DR-applicable FlowSpecs
@@ -210,26 +258,51 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
clearRunningFlowState(drUris);
}
} catch (IOException e) {
- throw new RuntimeException("Failed to get the iterator of all Spec URIS", e);
+ throw new RuntimeException("Failed to get Spec URIs with tag to clear running flow state", e);
}
- while (specUris.hasNext()) {
- Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
- try {
- // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
- if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
- (FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
- Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
- onAddSpec(modifiedSpec);
- } else {
- onAddSpec(spec);
+ int startOffset = 0;
+ long batchGetStartTime;
+ long batchGetEndTime;
+
+ while (startOffset < numSpecs) {
+ batchGetStartTime = System.currentTimeMillis();
+ Collection<Spec> batchOfSpecs = this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+ Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+ batchGetEndTime = System.currentTimeMillis();
+
+ while (batchOfSpecsIterator.hasNext()) {
+ Spec spec = batchOfSpecsIterator.next();
+ try {
+ addSpecHelperMethod(spec);
+ urisLeftToSchedule.remove(spec.getUri());
+ } catch (Exception e) {
+ // If there is an uncaught error thrown during compilation, log it and continue adding flows
+ _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
}
- } catch (Exception e) {
- // If there is an uncaught error thrown during compilation, log it and continue adding flows
- _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
}
+ startOffset += this.loadSpecsBatchSize;
+ // This count is used to ensure the average spec get time is calculated accurately for the last batch which may be
+ // smaller than the loadSpecsBatchSize
+ averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / batchOfSpecs.size();
}
+
+ // Ensure we did not miss any specs due to ordering changing (deletions/insertions) while loading
+ Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+ while (urisLeft.hasNext()) {
+ URI uri = urisLeft.next();
+ try {
+ Spec spec = this.flowCatalog.get().getSpecWrapper(uri);
+ addSpecHelperMethod(spec);
+ } catch (Exception e) {
+ // If there is an uncaught error thrown during compilation, log it and continue adding flows
+ _log.error("Could not schedule spec uri {} from flowCatalog due to ", uri, e);
+ }
+
+ }
+
this.flowCatalog.get().getMetrics().updateGetSpecTime(startTime);
+ this.timeToInitializeSchedulerValue = System.currentTimeMillis() - startTime;
}
/**
@@ -343,8 +416,27 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
}
}
+ // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+ // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+ Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.MODIFICATION_TIME_KEY, "0"));
+ String uriString = flowSpec.getUri().toString();
+ Boolean isRunImmediately = PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
+ // If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
+ // enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
+ if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
+ && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+ // For run-immediately flows with a schedule the modified_time would remain the same
+ if (this.lastUpdatedTimeForFlowSpec.get(uriString).compareTo(modificationTime) > 0
+ || (this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) && !isRunImmediately)) {
+ _log.warn("Ignoring the spec {} modified at time {} because we have a more updated version from time {}",
+ addedSpec, modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+ return new AddSpecResponse(response);
+ }
+ }
+
// todo : we should probably not schedule a flow if it is a runOnce flow
this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+ this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(), modificationTime);
if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
_log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
@@ -353,6 +445,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
} catch (JobException je) {
_log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
this.scheduledFlowSpecs.remove(addedSpec.getUri().toString());
+ this.lastUpdatedTimeForFlowSpec.remove(flowSpecUri.toString());
return null;
}
if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
@@ -378,6 +471,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
if (this.scheduledFlowSpecs.containsKey(specURI.toString())) {
_log.info("Unscheduling flowSpec " + specURI + "/" + specVersion);
this.scheduledFlowSpecs.remove(specURI.toString());
+ this.lastUpdatedTimeForFlowSpec.remove(specURI.toString());
unscheduleJob(specURI.toString());
} else {
throw new JobException(String.format(
@@ -523,6 +617,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
}
GobblinServiceJobScheduler.this.flowCatalog.get().remove(specUri, new Properties(), false);
GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(specUri.toString());
+ GobblinServiceJobScheduler.this.lastUpdatedTimeForFlowSpec.remove(specUri.toString());
}
} catch (JobException je) {
_log.error("Failed to run job " + this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);