You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/10/28 11:09:55 UTC

incubator-eagle git commit: EAGLE-683: Improve metadata store performance

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 38d46c8bb -> e3f358c84


EAGLE-683: Improve metadata store performance

Currently, we enable periodically schedule in coordinator service itself.
If this is enabled, the schedule spec will finally used up metadata storage.

So we need to improve metadata store performance by:
1. Disable periodically schedule by default (storage like mysql doesn't have capped feature).
2. For mongodb storage, use capped collection for schedule_specs, policy_snapshots,
streamSnapshots, groupSpecs, alertSpecs, monitoredStreams, spoutSpecs (all schedule spec related collection).

Author: Zeng, Bryant
Reviewer: ralphsu

This closes #566


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/e3f358c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/e3f358c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/e3f358c8

Branch: refs/heads/master
Commit: e3f358c8462af497fdc9f4498dbf374599af57e6
Parents: 38d46c8
Author: mizeng <mi...@ebaysf.com>
Authored: Wed Oct 26 14:22:33 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Fri Oct 28 19:09:59 2016 +0800

----------------------------------------------------------------------
 .../eagle/alert/coordinator/Coordinator.java    |  6 ++-
 .../metadata/impl/MongoMetadataDaoImpl.java     | 53 ++++++++++++++++----
 .../alert/resource/impl/MongoImplTest.java      |  3 --
 .../src/test/resources/application-mongo.conf   |  3 +-
 4 files changed, 50 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e3f358c8/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
index e0bd5c3..deeeec9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -280,10 +280,12 @@ public class Coordinator {
                 });
                 scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
 
-                //
+                // disable periodically schedule by default due for the sake of Metadata store performance
+                /***
                 scheduleSrv.scheduleAtFixedRate(new CoordinatorTrigger(config, client), CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_DELAY,
                     CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_INTERVAL, TimeUnit.MILLISECONDS);
-
+                ***/
+                
                 Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
                 LOG.info("Eagle Coordinator started ...");
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e3f358c8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
index f8186ac..088896c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
@@ -25,6 +25,8 @@ import com.mongodb.MongoClient;
 import com.mongodb.MongoClientURI;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import com.mongodb.client.model.CreateCollectionOptions;
 import com.mongodb.client.model.IndexOptions;
 import com.mongodb.client.model.UpdateOptions;
 import com.mongodb.client.result.DeleteResult;
@@ -59,6 +61,8 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
     private static final String DB_NAME = "ump_alert_metadata";
     private static final Logger LOG = LoggerFactory.getLogger(MongoMetadataDaoImpl.class);
     private static final ObjectMapper mapper = new ObjectMapper();
+    private static final int DEFAULT_CAPPED_MAX_SIZE = 500 * 1024 * 1024;
+    private static final int DEFAULT_CAPPED_MAX_DOCUMENTS = 20000;
 
     static {
         mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@@ -66,6 +70,8 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
 
     private final String connection;
     private final MongoClient client;
+    private final int cappedMaxSize;
+    private final int cappedMaxDocuments;
 
     private MongoDatabase db;
     private MongoCollection<Document> cluster;
@@ -91,10 +97,39 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
     @Inject
     public MongoMetadataDaoImpl(Config config) {
         this.connection = config.getString("connection");
+        this.cappedMaxSize = config.hasPath("cappedMaxSize") ? config.getInt("cappedMaxSize") : DEFAULT_CAPPED_MAX_SIZE;
+        this.cappedMaxDocuments = config.hasPath("cappedMaxDocuments") ? config.getInt("cappedMaxDocuments") : DEFAULT_CAPPED_MAX_DOCUMENTS;
         this.client = new MongoClient(new MongoClientURI(this.connection));
         init();
     }
 
+    private boolean isCollectionExists(String collectionName) {
+        boolean result = false;
+        MongoIterable<String> allCollections = db.listCollectionNames();
+        for ( String collection : allCollections ) {
+            if (collection.equals(collectionName)) {
+                result = true;
+                break;
+            }
+        }
+
+        return result;
+    }
+
+    private MongoCollection<Document> getCollection(String collectionName) {
+        // first check if collection exists, if not then create a new collection with cappedSize
+        if (!isCollectionExists(collectionName)) {
+            CreateCollectionOptions option = new CreateCollectionOptions();
+            option.capped(true);
+            option.maxDocuments(cappedMaxDocuments);
+            option.sizeInBytes(cappedMaxSize);
+            db.createCollection(collectionName, option);
+        }
+
+        return db.getCollection(collectionName);
+
+    }
+
     private void init() {
         db = client.getDatabase(DB_NAME);
         IndexOptions io = new IndexOptions().background(true).name("nameIndex");
@@ -138,10 +173,10 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
         BsonDocument doc1 = new BsonDocument();
         IndexOptions io1 = new IndexOptions().background(true).name("versionIndex");
         doc1.append("version", new BsonInt32(1));
-        scheduleStates = db.getCollection("schedule_specs");
+        scheduleStates = getCollection("schedule_specs");
         scheduleStates.createIndex(doc1, io1);
 
-        spoutSpecs = db.getCollection("spoutSpecs");
+        spoutSpecs = getCollection("spoutSpecs");
         {
             IndexOptions ioInternal = new IndexOptions().background(true).name("topologyIdIndex");
             BsonDocument docInternal = new BsonDocument();
@@ -149,7 +184,7 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
             spoutSpecs.createIndex(docInternal, ioInternal);
         }
 
-        alertSpecs = db.getCollection("alertSpecs");
+        alertSpecs = getCollection("alertSpecs");
         {
             IndexOptions ioInternal = new IndexOptions().background(true).name("topologyNameIndex");
             BsonDocument docInternal = new BsonDocument();
@@ -157,22 +192,22 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
             alertSpecs.createIndex(docInternal, ioInternal);
         }
 
-        groupSpecs = db.getCollection("groupSpecs");
+        groupSpecs = getCollection("groupSpecs");
         groupSpecs.createIndex(doc1, io1);
 
-        publishSpecs = db.getCollection("publishSpecs");
+        publishSpecs = getCollection("publishSpecs");
         publishSpecs.createIndex(doc1, io1);
 
-        policySnapshots = db.getCollection("policySnapshots");
+        policySnapshots = getCollection("policySnapshots");
         policySnapshots.createIndex(doc1, io);
 
-        streamSnapshots = db.getCollection("streamSnapshots");
+        streamSnapshots = getCollection("streamSnapshots");
         streamSnapshots.createIndex(doc1, io);
 
-        monitoredStreams = db.getCollection("monitoredStreams");
+        monitoredStreams = getCollection("monitoredStreams");
         monitoredStreams.createIndex(doc1, io);
 
-        assignments = db.getCollection("assignments");
+        assignments = getCollection("assignments");
         assignments.createIndex(doc1, io1);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e3f358c8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
index ff83b80..213943d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
@@ -203,9 +203,6 @@ public class MongoImplTest {
             Assert.assertEquals(200, result.code);
             List<StreamDefinition> assigns = dao.listStreams();
             Assert.assertEquals(1, assigns.size());
-            dao.removeStream("stream");
-            assigns = dao.listStreams();
-            Assert.assertEquals(0, assigns.size());
         }
         // alert
         {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e3f358c8/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mongo.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mongo.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mongo.conf
index fbb99de..bdc632c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mongo.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mongo.conf
@@ -16,6 +16,7 @@
 {
   "datastore": {
     "metadataDao": "org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl",
-    "connection": "mongodb://localhost:27017"
+    "connection": "mongodb://localhost:27017",
+    "cappedSize": 20000
   }
 }