You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/08/17 17:51:53 UTC
[gobblin] branch master updated: [GOBBLIN-1518] create getSize api
for spec store which is faster than doing getAll (#3367)
This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bea210a [GOBBLIN-1518] create getSize api for spec store which is faster than doing getAll (#3367)
bea210a is described below
commit bea210a70fad93951eedb49590b9bcca4615e255
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Tue Aug 17 23:21:47 2021 +0530
[GOBBLIN-1518] create getSize api for spec store which is faster than doing getAll (#3367)
During a memory profiling we noticed that a lot of CPU cycles are used in calling getSpecs().size() from metric reporting thread which runs in every n seconds. Right now there is no API to get size of the spec store.
Doing getAllSpecs and then calling size() over it, does a lot of unnecessary work viz reading the content of spec.
This PR will provide a method to direct get the size of the spec store without doing any unnecessary work.
---
.../gobblin/runtime/api/InstrumentedSpecStore.java | 13 +++++++++++++
.../org/apache/gobblin/runtime/api/SpecCatalog.java | 4 +++-
.../java/org/apache/gobblin/runtime/api/SpecStore.java | 6 ++++++
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 12 ++++++++++++
.../gobblin/runtime/spec_catalog/TopologyCatalog.java | 9 +++++++++
.../apache/gobblin/runtime/spec_store/FSSpecStore.java | 17 +++++++++++++++++
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 13 +++++++++++++
.../gobblin/runtime/spec_catalog/FlowCatalogTest.java | 18 +++++++++++-------
.../gobblin/runtime/spec_store/FSSpecStoreTest.java | 7 ++++---
.../gobblin/runtime/spec_store/MysqlSpecStoreTest.java | 4 ++++
10 files changed, 92 insertions(+), 11 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index e2c206e..94585a0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -164,6 +164,18 @@ public abstract class InstrumentedSpecStore implements SpecStore {
}
}
+ @Override
+ public int getSize() throws IOException {
+ if (!instrumentationEnabled) {
+ return getSizeImpl();
+ } else {
+ long startTimeMillis = System.currentTimeMillis();
+ int size = getSizeImpl();
+ Instrumented.updateTimer(this.getAllTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+ return size;
+ }
+ }
+
public abstract void addSpecImpl(Spec spec) throws IOException;
public abstract Spec updateSpecImpl(Spec spec) throws IOException, SpecNotFoundException;
public abstract boolean existsImpl(URI specUri) throws IOException;
@@ -171,6 +183,7 @@ public abstract class InstrumentedSpecStore implements SpecStore {
public abstract boolean deleteSpecImpl(URI specUri) throws IOException;
public abstract Collection<Spec> getSpecsImpl() throws IOException;
public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
+ public abstract int getSizeImpl() throws IOException;
/** child classes can implement this if they want to get specs using {@link SpecSearchObject} */
public Collection<Spec> getSpecsImpl(SpecSearchObject specUri) throws IOException {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index ebb88e7..72611fd 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -51,6 +51,8 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
* */
Collection<Spec> getSpecs();
+ int getSize();
+
/** Metrics for the spec catalog; null if
* ({@link #isInstrumentationEnabled()}) is false. */
SpecCatalog.StandardMetrics getMetrics();
@@ -110,7 +112,7 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
this.totalUpdatedSpecs = new AtomicLong(0);
this.numActiveSpecs = metricsContext.newContextAwareGauge(NUM_ACTIVE_SPECS_NAME, ()->{
long startTime = System.currentTimeMillis();
- int size = specCatalog.getSpecs().size();
+ int size = specCatalog.getSize();
updateGetSpecTime(startTime);
return size;
});
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index 25f8f68..259d5c7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -139,4 +139,10 @@ public interface SpecStore {
* with root-level FileSystem directory.
*/
Optional<URI> getSpecStoreURI();
+
+ /***
+ * Returns the number of {@link Spec}s in the {@link SpecStore}.
+ * @throws IOException Exception in retrieving the count of {@link Spec}s.
+ */
+ int getSize() throws IOException;
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 3cb2d90..c9c17b0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -266,6 +266,18 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
}
+ /**
+ * Get number of specs from {@link SpecStore}
+ */
+ @Override
+ public int getSize() {
+ try {
+ return specStore.getSize();
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot retrieve number of specs from Spec store", e);
+ }
+ }
+
public boolean exists(URI uri) {
try {
return specStore.exists(uri);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 6464d9f..64852e4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -233,6 +233,15 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
}
@Override
+ public int getSize() {
+ try {
+ return specStore.getSize();
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot retrieve number of specs from Spec store", e);
+ }
+ }
+
+ @Override
public Spec getSpecs(URI uri) throws SpecNotFoundException {
try {
return specStore.getSpec(uri);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 14a3069..63e829a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -352,4 +352,21 @@ public class FSSpecStore extends InstrumentedSpecStore {
protected URI getURIFromPath(Path fsPath, Path fsSpecStoreDirPath) {
return PathUtils.relativizePath(fsPath, fsSpecStoreDirPath).toUri();
}
+
+ public int getSizeImpl() throws IOException {
+ return getSizeImpl(this.fsSpecStoreDirPath);
+ }
+
+ private int getSizeImpl(Path directory) throws IOException {
+ int specs = 0;
+ FileStatus[] fileStatuses = fs.listStatus(directory);
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isDirectory()) {
+ specs += getSizeImpl(fileStatus.getPath());
+ } else {
+ specs++;
+ }
+ }
+ return specs;
+ }
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index a9364d8..032da32 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -84,6 +84,7 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s";
private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri FROM %s WHERE tag = ?";
+ private static final String GET_SIZE_STATEMENT = "SELECT COUNT(*) FROM %s ";
protected final DataSource dataSource;
protected final String tableName;
@@ -252,6 +253,18 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
}
@Override
+ public int getSizeImpl() throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement = connection.prepareStatement(String.format(GET_SIZE_STATEMENT, this.tableName));
+ ResultSet resultSet = statement.executeQuery()) {
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT_WITH_TAG, this.tableName))) {
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index d69d4e5..9147b7f 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -147,7 +147,7 @@ public class FlowCatalogTest {
FlowSpec flowSpec = (FlowSpec) spec;
logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
}
- Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
+ Assert.assertEquals(specs.size(), 0, "Spec store should be empty before addition");
// Create and add Spec
this.flowCatalog.put(flowSpec);
@@ -160,7 +160,8 @@ public class FlowCatalogTest {
flowSpec = (FlowSpec) spec;
logger.info("[After Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
}
- Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec after addition");
+ Assert.assertEquals(specs.size(), 1, "Spec store should contain 1 Spec after addition");
+ Assert.assertEquals(flowCatalog.getSize(), 1, "Spec store should contain 1 Spec after addition");
}
@Test (dependsOnMethods = "createFlowSpec")
@@ -178,8 +179,8 @@ public class FlowCatalogTest {
FlowSpec flowSpec = (FlowSpec) spec;
logger.info("[Before Delete] Spec " + i++ + ": " + gson.toJson(flowSpec));
}
- Assert.assertTrue(specs.size() == 1, "Spec store should initially have 1 Spec before deletion");
-
+ Assert.assertEquals(specs.size(), 1, "Spec store should initially have 1 Spec before deletion");
+ Assert.assertEquals(flowCatalog.getSize(), 1, "Spec store should initially have 1 Spec before deletion");
this.flowCatalog.remove(flowSpec.getUri());
// List Specs after adding
@@ -190,7 +191,8 @@ public class FlowCatalogTest {
flowSpec = (FlowSpec) spec;
logger.info("[After Delete] Spec " + i++ + ": " + gson.toJson(flowSpec));
}
- Assert.assertTrue(specs.size() == 0, "Spec store should be empty after deletion");
+ Assert.assertEquals(specs.size(), 0, "Spec store should be empty after deletion");
+ Assert.assertEquals(flowCatalog.getSize(), 0, "Spec store should be empty after deletion");
}
@Test (dependsOnMethods = "deleteFlowSpec")
@@ -202,7 +204,7 @@ public class FlowCatalogTest {
FlowSpec flowSpec = (FlowSpec) spec;
logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
}
- Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
+ Assert.assertEquals(specs.size(), 0, "Spec store should be empty before addition");
// Create and add Spec
FlowSpec badSpec = initFlowSpec(SPEC_STORE_DIR, computeFlowSpecURI(), "badFlow");
@@ -214,6 +216,7 @@ public class FlowCatalogTest {
// Spec should be rejected from being stored
specs = flowCatalog.getSpecs();
Assert.assertEquals(specs.size(), 0);
+ Assert.assertEquals(flowCatalog.getSize(), 0);
}
@Test (dependsOnMethods = "testRejectBadFlow")
@@ -226,7 +229,7 @@ public class FlowCatalogTest {
FlowSpec flowSpec = (FlowSpec) spec;
logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
}
- Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
+ Assert.assertEquals(specs.size(), 0, "Spec store should be empty before addition");
// Create and add Spec
@@ -235,6 +238,7 @@ public class FlowCatalogTest {
// Spec should be rejected from being stored
specs = flowCatalog.getSpecs();
Assert.assertEquals(specs.size(), 0);
+ Assert.assertEquals(flowCatalog.getSize(), 0);
}
public static URI computeFlowSpecURI() {
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
index a952f5d..daf296a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
@@ -88,8 +88,8 @@ public class FSSpecStoreTest {
// The fail and serDe datasets wouldn't survive
Assert.assertEquals(specList.size(), 2);
for (Spec spec: specList) {
- Assert.assertTrue(!spec.getDescription().contains("spec_fail"));
- Assert.assertTrue(!spec.getDescription().contains("serDeFail"));
+ Assert.assertFalse(spec.getDescription().contains("spec_fail"));
+ Assert.assertFalse(spec.getDescription().contains("serDeFail"));
}
}
@@ -109,7 +109,7 @@ public class FSSpecStoreTest {
SerializationUtils.deserialize(ByteStreams.toByteArray(fis));
// This line should never be reached since we generate SerDe Exception on purpose.
- Assert.assertTrue(false);
+ Assert.fail();
return null;
}
else return initFlowSpec(Files.createTempDir().getAbsolutePath());
@@ -149,6 +149,7 @@ public class FSSpecStoreTest {
}
Assert.assertEquals(count, 3);
+ Assert.assertEquals(fsSpecStore.getSize(), 3);
Assert.assertTrue(result.contains(specURI0));
Assert.assertTrue(result.contains(specURI1));
Assert.assertTrue(result.contains(specURI2));
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 63b9141..b382e76 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -139,6 +139,8 @@ public class MysqlSpecStoreTest {
this.specStore.addSpec(this.flowSpec1);
this.specStore.addSpec(this.flowSpec2);
this.specStore.addSpec(this.flowSpec4);
+
+ Assert.assertEquals(this.specStore.getSize(), 3);
Assert.assertTrue(this.specStore.exists(this.uri1));
Assert.assertTrue(this.specStore.exists(this.uri2));
Assert.assertTrue(this.specStore.exists(this.uri4));
@@ -245,7 +247,9 @@ public class MysqlSpecStoreTest {
@Test (dependsOnMethods = "testGetSpecWithTag")
public void testDeleteSpec() throws Exception {
+ Assert.assertEquals(this.specStore.getSize(), 5);
this.specStore.deleteSpec(this.uri1);
+ Assert.assertEquals(this.specStore.getSize(), 4);
Assert.assertFalse(this.specStore.exists(this.uri1));
}