You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/08/17 17:51:53 UTC

[gobblin] branch master updated: [GOBBLIN-1518] create getSize api for spec store which is faster than doing getAll (#3367)

This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bea210a  [GOBBLIN-1518] create getSize api for spec store which is faster than doing getAll (#3367)
bea210a is described below

commit bea210a70fad93951eedb49590b9bcca4615e255
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Tue Aug 17 23:21:47 2021 +0530

    [GOBBLIN-1518] create getSize api for spec store which is faster than doing getAll (#3367)
    
    During a memory profiling we noticed that a lot of CPU cycles are used in calling getSpecs().size() from metric reporting thread which runs in every n seconds. Right now there is no API to get size of the spec store.
    Doing getAllSpecs and then calling size() over it, does a lot of unnecessary work viz reading the content of spec.
    This PR will provide a method to direct get the size of the spec store without doing any unnecessary work.
---
 .../gobblin/runtime/api/InstrumentedSpecStore.java     | 13 +++++++++++++
 .../org/apache/gobblin/runtime/api/SpecCatalog.java    |  4 +++-
 .../java/org/apache/gobblin/runtime/api/SpecStore.java |  6 ++++++
 .../gobblin/runtime/spec_catalog/FlowCatalog.java      | 12 ++++++++++++
 .../gobblin/runtime/spec_catalog/TopologyCatalog.java  |  9 +++++++++
 .../apache/gobblin/runtime/spec_store/FSSpecStore.java | 17 +++++++++++++++++
 .../gobblin/runtime/spec_store/MysqlSpecStore.java     | 13 +++++++++++++
 .../gobblin/runtime/spec_catalog/FlowCatalogTest.java  | 18 +++++++++++-------
 .../gobblin/runtime/spec_store/FSSpecStoreTest.java    |  7 ++++---
 .../gobblin/runtime/spec_store/MysqlSpecStoreTest.java |  4 ++++
 10 files changed, 92 insertions(+), 11 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index e2c206e..94585a0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -164,6 +164,18 @@ public abstract class InstrumentedSpecStore implements SpecStore {
     }
   }
 
+  @Override
+  public int getSize() throws IOException {
+    if (!instrumentationEnabled) {
+      return getSizeImpl();
+    } else {
+      long startTimeMillis = System.currentTimeMillis();
+      int size = getSizeImpl();
+      Instrumented.updateTimer(this.getAllTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+      return size;
+    }
+  }
+
   public abstract void addSpecImpl(Spec spec) throws IOException;
   public abstract Spec updateSpecImpl(Spec spec) throws IOException, SpecNotFoundException;
   public abstract boolean existsImpl(URI specUri) throws IOException;
@@ -171,6 +183,7 @@ public abstract class InstrumentedSpecStore implements SpecStore {
   public abstract boolean deleteSpecImpl(URI specUri) throws IOException;
   public abstract Collection<Spec> getSpecsImpl() throws IOException;
   public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
+  public abstract int getSizeImpl() throws IOException;
 
   /** child classes can implement this if they want to get specs using {@link SpecSearchObject} */
   public Collection<Spec> getSpecsImpl(SpecSearchObject specUri) throws IOException {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index ebb88e7..72611fd 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -51,6 +51,8 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
    * */
   Collection<Spec> getSpecs();
 
+  int getSize();
+
   /** Metrics for the spec catalog; null if
    * ({@link #isInstrumentationEnabled()}) is false. */
   SpecCatalog.StandardMetrics getMetrics();
@@ -110,7 +112,7 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
       this.totalUpdatedSpecs = new AtomicLong(0);
       this.numActiveSpecs = metricsContext.newContextAwareGauge(NUM_ACTIVE_SPECS_NAME,  ()->{
           long startTime = System.currentTimeMillis();
-          int size = specCatalog.getSpecs().size();
+          int size = specCatalog.getSize();
           updateGetSpecTime(startTime);
           return size;
       });
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index 25f8f68..259d5c7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -139,4 +139,10 @@ public interface SpecStore {
    * with root-level FileSystem directory.
    */
   Optional<URI> getSpecStoreURI();
+
+  /***
+   * Returns the number of {@link Spec}s in the {@link SpecStore}.
+   * @throws IOException Exception in retrieving the count of {@link Spec}s.
+   */
+  int getSize() throws IOException;
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 3cb2d90..c9c17b0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -266,6 +266,18 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     }
   }
 
+  /**
+   * Get number of specs from {@link SpecStore}
+   */
+  @Override
+  public int getSize() {
+    try {
+      return specStore.getSize();
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot retrieve number of specs from Spec store", e);
+    }
+  }
+
   public boolean exists(URI uri) {
     try {
       return specStore.exists(uri);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 6464d9f..64852e4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -233,6 +233,15 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
   }
 
   @Override
+  public int getSize() {
+    try {
+      return specStore.getSize();
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot retrieve number of specs from Spec store", e);
+    }
+  }
+
+  @Override
   public Spec getSpecs(URI uri) throws SpecNotFoundException {
     try {
       return specStore.getSpec(uri);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 14a3069..63e829a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -352,4 +352,21 @@ public class FSSpecStore extends InstrumentedSpecStore {
   protected URI getURIFromPath(Path fsPath, Path fsSpecStoreDirPath) {
     return PathUtils.relativizePath(fsPath, fsSpecStoreDirPath).toUri();
   }
+
+  public int getSizeImpl() throws IOException {
+    return getSizeImpl(this.fsSpecStoreDirPath);
+  }
+
+  private int getSizeImpl(Path directory) throws IOException {
+    int specs = 0;
+    FileStatus[] fileStatuses = fs.listStatus(directory);
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        specs += getSizeImpl(fileStatus.getPath());
+      } else {
+        specs++;
+      }
+    }
+    return specs;
+  }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index a9364d8..032da32 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -84,6 +84,7 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
   private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s";
   private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
   private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri FROM %s WHERE tag = ?";
+  private static final String GET_SIZE_STATEMENT = "SELECT COUNT(*) FROM %s ";
 
   protected final DataSource dataSource;
   protected final String tableName;
@@ -252,6 +253,18 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
   }
 
   @Override
+  public int getSizeImpl() throws IOException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = connection.prepareStatement(String.format(GET_SIZE_STATEMENT, this.tableName));
+        ResultSet resultSet = statement.executeQuery()) {
+      resultSet.next();
+      return resultSet.getInt(1);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
   public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT_WITH_TAG, this.tableName))) {
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index d69d4e5..9147b7f 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -147,7 +147,7 @@ public class FlowCatalogTest {
       FlowSpec flowSpec = (FlowSpec) spec;
       logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
     }
-    Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
+    Assert.assertEquals(specs.size(), 0, "Spec store should be empty before addition");
 
     // Create and add Spec
     this.flowCatalog.put(flowSpec);
@@ -160,7 +160,8 @@ public class FlowCatalogTest {
       flowSpec = (FlowSpec) spec;
       logger.info("[After Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
     }
-    Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec after addition");
+    Assert.assertEquals(specs.size(), 1, "Spec store should contain 1 Spec after addition");
+    Assert.assertEquals(flowCatalog.getSize(), 1, "Spec store should contain 1 Spec after addition");
   }
 
   @Test (dependsOnMethods = "createFlowSpec")
@@ -178,8 +179,8 @@ public class FlowCatalogTest {
       FlowSpec flowSpec = (FlowSpec) spec;
       logger.info("[Before Delete] Spec " + i++ + ": " + gson.toJson(flowSpec));
     }
-    Assert.assertTrue(specs.size() == 1, "Spec store should initially have 1 Spec before deletion");
-
+    Assert.assertEquals(specs.size(), 1, "Spec store should initially have 1 Spec before deletion");
+    Assert.assertEquals(flowCatalog.getSize(), 1, "Spec store should initially have 1 Spec before deletion");
     this.flowCatalog.remove(flowSpec.getUri());
 
     // List Specs after adding
@@ -190,7 +191,8 @@ public class FlowCatalogTest {
       flowSpec = (FlowSpec) spec;
       logger.info("[After Delete] Spec " + i++ + ": " + gson.toJson(flowSpec));
     }
-    Assert.assertTrue(specs.size() == 0, "Spec store should be empty after deletion");
+    Assert.assertEquals(specs.size(), 0, "Spec store should be empty after deletion");
+    Assert.assertEquals(flowCatalog.getSize(), 0, "Spec store should be empty after deletion");
   }
 
   @Test (dependsOnMethods = "deleteFlowSpec")
@@ -202,7 +204,7 @@ public class FlowCatalogTest {
       FlowSpec flowSpec = (FlowSpec) spec;
       logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
     }
-    Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
+    Assert.assertEquals(specs.size(), 0, "Spec store should be empty before addition");
 
     // Create and add Spec
     FlowSpec badSpec = initFlowSpec(SPEC_STORE_DIR, computeFlowSpecURI(), "badFlow");
@@ -214,6 +216,7 @@ public class FlowCatalogTest {
     // Spec should be rejected from being stored
     specs = flowCatalog.getSpecs();
     Assert.assertEquals(specs.size(), 0);
+    Assert.assertEquals(flowCatalog.getSize(), 0);
   }
 
   @Test (dependsOnMethods = "testRejectBadFlow")
@@ -226,7 +229,7 @@ public class FlowCatalogTest {
       FlowSpec flowSpec = (FlowSpec) spec;
       logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
     }
-    Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
+    Assert.assertEquals(specs.size(), 0, "Spec store should be empty before addition");
 
     // Create and add Spec
 
@@ -235,6 +238,7 @@ public class FlowCatalogTest {
     // Spec should be rejected from being stored
     specs = flowCatalog.getSpecs();
     Assert.assertEquals(specs.size(), 0);
+    Assert.assertEquals(flowCatalog.getSize(), 0);
   }
 
   public static URI computeFlowSpecURI() {
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
index a952f5d..daf296a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
@@ -88,8 +88,8 @@ public class FSSpecStoreTest {
     // The fail and serDe datasets wouldn't survive
     Assert.assertEquals(specList.size(), 2);
     for (Spec spec: specList) {
-      Assert.assertTrue(!spec.getDescription().contains("spec_fail"));
-      Assert.assertTrue(!spec.getDescription().contains("serDeFail"));
+      Assert.assertFalse(spec.getDescription().contains("spec_fail"));
+      Assert.assertFalse(spec.getDescription().contains("serDeFail"));
     }
   }
 
@@ -109,7 +109,7 @@ public class FSSpecStoreTest {
         SerializationUtils.deserialize(ByteStreams.toByteArray(fis));
 
         // This line should never be reached since we generate SerDe Exception on purpose.
-        Assert.assertTrue(false);
+        Assert.fail();
         return null;
       }
       else return initFlowSpec(Files.createTempDir().getAbsolutePath());
@@ -149,6 +149,7 @@ public class FSSpecStoreTest {
     }
 
     Assert.assertEquals(count, 3);
+    Assert.assertEquals(fsSpecStore.getSize(), 3);
     Assert.assertTrue(result.contains(specURI0));
     Assert.assertTrue(result.contains(specURI1));
     Assert.assertTrue(result.contains(specURI2));
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 63b9141..b382e76 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -139,6 +139,8 @@ public class MysqlSpecStoreTest {
     this.specStore.addSpec(this.flowSpec1);
     this.specStore.addSpec(this.flowSpec2);
     this.specStore.addSpec(this.flowSpec4);
+
+    Assert.assertEquals(this.specStore.getSize(), 3);
     Assert.assertTrue(this.specStore.exists(this.uri1));
     Assert.assertTrue(this.specStore.exists(this.uri2));
     Assert.assertTrue(this.specStore.exists(this.uri4));
@@ -245,7 +247,9 @@ public class MysqlSpecStoreTest {
 
   @Test (dependsOnMethods = "testGetSpecWithTag")
   public void testDeleteSpec() throws Exception {
+    Assert.assertEquals(this.specStore.getSize(), 5);
     this.specStore.deleteSpec(this.uri1);
+    Assert.assertEquals(this.specStore.getSize(), 4);
     Assert.assertFalse(this.specStore.exists(this.uri1));
   }