You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2017/10/24 13:40:33 UTC

incubator-rya git commit: RYA-393 Fixed MongoDB DAO Batch Writer from dropping statements. Closes #236

Repository: incubator-rya
Updated Branches:
  refs/heads/master 853e0eea7 -> b372ebcdb


RYA-393 Fixed MongoDB DAO Batch Writer from dropping statements. Closes #236

Fixed secondary indexers so they flush when the DAO flushes. Indexers now close when DAO closes.  Fixed daemon thread in MongoDB batch writer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/b372ebcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/b372ebcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/b372ebcd

Branch: refs/heads/master
Commit: b372ebcdb43d19315b7d430ae55fedf45654245c
Parents: 853e0ee
Author: eric.white <Er...@parsons.com>
Authored: Wed Oct 4 11:21:06 2017 -0400
Committer: Aaron Mihalik <aa...@gmail.com>
Committed: Tue Oct 24 09:39:13 2017 -0400

----------------------------------------------------------------------
 .../org/apache/rya/accumulo/AccumuloRyaDAO.java |  11 ++
 .../org/apache/rya/mongodb/MongoDBRyaDAO.java   |  18 ++
 .../rya/mongodb/batch/MongoDbBatchWriter.java   |  11 +-
 .../batch/collection/MongoCollectionType.java   |   3 +-
 .../rya/mongodb/MongoDBRyaBatchWriterIT.java    | 168 +++++++++++++++++++
 .../org/apache/rya/mongodb/MongoDBRyaDAOIT.java |   8 +-
 .../org/apache/rya/mongodb/MongoTestBase.java   |  17 ++
 .../indexing/mongodb/AbstractMongoIndexer.java  |   6 +
 8 files changed, 239 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
index 8c99e44..a8350d9 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
@@ -472,11 +472,22 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
     public void flush() throws RyaDAOException {
         try {
             mt_bw.flush();
+            flushIndexers();
         } catch (final MutationsRejectedException e) {
             throw new RyaDAOException(e);
         }
     }
 
+    private void flushIndexers() throws RyaDAOException {
+        for (final AccumuloIndexer indexer : secondaryIndexers) {
+            try {
+                indexer.flush();
+            } catch (final IOException e) {
+                logger.error("Error flushing data in indexer: " + indexer.getClass().getSimpleName(), e);
+            }
+        }
+    }
+
     protected String[] getTables() {
         // core tables
         final List<String> tableNames = Lists.newArrayList(

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
index fe0f6f9..d263b9c 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
@@ -171,6 +171,13 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
         } catch (final MongoDbBatchWriterException e) {
             throw new RyaDAOException("Error shutting down MongoDB batch writer", e);
         }
+        for(final MongoSecondaryIndex indexer : secondaryIndexers) {
+            try {
+                indexer.close();
+            } catch (final IOException e) {
+                log.error("Error closing indexer: " + indexer.getClass().getSimpleName(), e);
+            }
+        }
         if (mongoClient != null) {
             mongoClient.close();
         }
@@ -314,8 +321,19 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
     public void flush() throws RyaDAOException {
         try {
             mongoDbBatchWriter.flush();
+            flushIndexers();
         } catch (final MongoDbBatchWriterException e) {
             throw new RyaDAOException("Error flushing data.", e);
         }
     }
+
+    private void flushIndexers() throws RyaDAOException {
+        for (final MongoSecondaryIndex indexer : secondaryIndexers) {
+            try {
+                indexer.flush();
+            } catch (final IOException e) {
+                log.error("Error flushing data in indexer: " + indexer.getClass().getSimpleName(), e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
index 2f52b5c..b609276 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
@@ -35,6 +35,8 @@ import org.apache.log4j.Logger;
 import org.apache.rya.mongodb.batch.collection.CollectionType;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.mongodb.DuplicateKeyException;
+import com.mongodb.MongoBulkWriteException;
 
 /**
  * Handles batch writing MongoDB statement objects to the repository. It takes
@@ -95,7 +97,6 @@ public class MongoDbBatchWriter<T> {
 
     private static final ThreadFactory QUEUE_THREAD_FACTORY = new ThreadFactoryBuilder()
         .setNameFormat("Queue Full Checker Thread - %d")
-        .setDaemon(true)
         .build();
 
     /**
@@ -218,6 +219,14 @@ public class MongoDbBatchWriter<T> {
             if (!batch.isEmpty()) {
                 collectionType.insertMany(batch);
             }
+        } catch (final DuplicateKeyException e) {
+            log.warn(e); // Suppress the stack trace so log doesn't get flooded.
+        } catch (final MongoBulkWriteException e) {
+            if (e.getMessage().contains("duplicate key error")) {
+                log.warn(e); // Suppress the stack trace so log doesn't get flooded.
+            } else {
+                throw new MongoDbBatchWriterException("Error flushing statements", e);
+            }
         } catch (final Exception e) {
             throw new MongoDbBatchWriterException("Error flushing statements", e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
index 8fb796a..11f2dc1 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.bson.Document;
 
 import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.InsertManyOptions;
 
 /**
  * Provides access to the {@link MongoCollection} type.
@@ -47,6 +48,6 @@ public class MongoCollectionType implements CollectionType<Document> {
 
     @Override
     public void insertMany(final List<Document> items) {
-        collection.insertMany(items);
+        collection.insertMany(items, new InsertManyOptions().ordered(false));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java
new file mode 100644
index 0000000..68bbc27
--- /dev/null
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaStatement.RyaStatementBuilder;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
+import org.apache.rya.mongodb.batch.collection.DbCollectionType;
+import org.apache.rya.mongodb.batch.collection.MongoCollectionType;
+import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
+import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+import org.bson.Document;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+
+/**
+ * Integration tests for the {@link MongoDbBatchWriter}.
+ */
+public class MongoDBRyaBatchWriterIT extends MongoTestBase {
+    private MongoDBRyaDAO dao;
+
+    private static void setupLogging() {
+        BasicConfigurator.configure();
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws Exception {
+        setupLogging();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        conf.setBoolean("rya.mongodb.dao.flusheachupdate", false);
+        conf.setInt("rya.mongodb.dao.batchwriter.size", 50_000);
+        conf.setLong("rya.mongodb.dao.batchwriter.flushtime", 100L);
+
+        final MongoClient client = super.getMongoClient();
+        dao = new MongoDBRyaDAO(conf, client);
+    }
+
+    @Test
+    public void testDuplicateKeys() throws Exception {
+        final List<RyaStatement> statements = new ArrayList<>();
+        statements.add(statement(1));
+        statements.add(statement(2));
+        statements.add(statement(1));
+        statements.add(statement(3));
+        statements.add(statement(1));
+        statements.add(statement(4));
+        statements.add(statement(1));
+        statements.add(statement(5));
+        statements.add(statement(1));
+        statements.add(statement(6));
+
+        dao.add(statements.iterator());
+
+        dao.flush();
+
+        Assert.assertEquals(6, getRyaCollection().count());
+    }
+
+    @Test
+    public void testDbCollectionFlush() throws Exception {
+        final MongoDBStorageStrategy<RyaStatement> storageStrategy = new SimpleMongoDBStorageStrategy();
+
+        final List<DBObject> objects = Lists.newArrayList(
+                storageStrategy.serialize(statement(1)),
+                storageStrategy.serialize(statement(2)),
+                storageStrategy.serialize(statement(2)),
+                null,
+                storageStrategy.serialize(statement(3)),
+                storageStrategy.serialize(statement(3)),
+                storageStrategy.serialize(statement(4))
+            );
+
+        final DbCollectionType collectionType = new DbCollectionType(getRyaDbCollection());
+        final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
+        final MongoDbBatchWriter<DBObject> mongoDbBatchWriter = new MongoDbBatchWriter<DBObject>(collectionType, mongoDbBatchWriterConfig);
+
+        mongoDbBatchWriter.start();
+        mongoDbBatchWriter.addObjectsToQueue(objects);
+        mongoDbBatchWriter.flush();
+        Thread.sleep(1_000);
+        mongoDbBatchWriter.addObjectsToQueue(objects);
+        mongoDbBatchWriter.flush();
+        Thread.sleep(1_000);
+        mongoDbBatchWriter.shutdown();
+        Assert.assertEquals(4, getRyaDbCollection().count());
+    }
+
+    @Test
+    public void testMongoCollectionFlush() throws Exception {
+        final MongoDBStorageStrategy<RyaStatement> storageStrategy = new SimpleMongoDBStorageStrategy();
+
+        final List<Document> documents = Lists.newArrayList(
+                toDocument(storageStrategy.serialize(statement(1))),
+                toDocument(storageStrategy.serialize(statement(2))),
+                toDocument(storageStrategy.serialize(statement(2))),
+                null,
+                toDocument(storageStrategy.serialize(statement(3))),
+                toDocument(storageStrategy.serialize(statement(3))),
+                toDocument(storageStrategy.serialize(statement(4)))
+            );
+
+        final MongoCollectionType mongoCollectionType = new MongoCollectionType(getRyaCollection());
+        final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
+        final MongoDbBatchWriter<Document> mongoDbBatchWriter = new MongoDbBatchWriter<Document>(mongoCollectionType, mongoDbBatchWriterConfig);
+
+        mongoDbBatchWriter.start();
+        mongoDbBatchWriter.addObjectsToQueue(documents);
+        mongoDbBatchWriter.flush();
+        Thread.sleep(1_000);
+        mongoDbBatchWriter.addObjectsToQueue(documents);
+        mongoDbBatchWriter.flush();
+        Thread.sleep(1_000);
+        mongoDbBatchWriter.shutdown();
+        Assert.assertEquals(4, getRyaCollection().count());
+    }
+
+    private static Document toDocument(final DBObject dbObject) {
+        if (dbObject == null) {
+            return null;
+        }
+        final Document document = Document.parse(dbObject.toString());
+        return document;
+    }
+
+    private static RyaURI ryaURI(final int v) {
+        return new RyaURI("u:" + v);
+    }
+
+    private static RyaStatement statement(final int v) {
+        final RyaStatementBuilder builder = new RyaStatementBuilder();
+        builder.setPredicate(ryaURI(v));
+        builder.setSubject(ryaURI(v));
+        builder.setObject(ryaURI(v));
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java
index 5f3605e..a014e8f 100644
--- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java
@@ -36,6 +36,7 @@ import org.apache.rya.mongodb.document.util.AuthorizationsUtil;
 import org.apache.rya.mongodb.document.visibility.DocumentVisibility;
 import org.bson.Document;
 import org.calrissian.mango.collect.CloseableIterable;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -46,7 +47,7 @@ import com.mongodb.client.MongoDatabase;
 
 public class MongoDBRyaDAOIT extends MongoTestBase {
     private MongoClient client;
-    private MongoDBRyaDAO dao;
+    private static MongoDBRyaDAO dao;
 
     @Before
     public void setUp() throws IOException, RyaDAOException{
@@ -55,6 +56,11 @@ public class MongoDBRyaDAOIT extends MongoTestBase {
         dao = new MongoDBRyaDAO(conf, client);
     }
 
+    @AfterClass
+    public static void tearDown() throws RyaDAOException {
+        dao.destroy();
+    }
+
     @Test
     public void testDeleteWildcard() throws RyaDAOException {
         final RyaStatementBuilder builder = new RyaStatementBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java
index ffd4fd9..e325e82 100644
--- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java
@@ -19,10 +19,13 @@
 package org.apache.rya.mongodb;
 
 import org.apache.hadoop.conf.Configuration;
+import org.bson.Document;
 import org.junit.After;
 import org.junit.Before;
 
+import com.mongodb.DBCollection;
 import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
 
 /**
  * A base class that may be used when implementing Mongo DB tests that use the
@@ -57,4 +60,18 @@ public class MongoTestBase {
     public MongoClient getMongoClient() {
         return mongoClient;
     }
+
+    /**
+     * @return The Rya triples {@link MongoCollection}.
+     */
+    public MongoCollection<Document> getRyaCollection() {
+        return mongoClient.getDatabase(conf.getMongoDBName()).getCollection(conf.getTriplesCollectionName());
+    }
+
+    /**
+     * @return The Rya triples {@link DBCollection}.
+     */
+    public DBCollection getRyaDbCollection() {
+        return mongoClient.getDB(conf.getMongoDBName()).getCollection(conf.getTriplesCollectionName());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
index f8ab40f..9ce6e22 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
@@ -121,6 +121,12 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
 
     @Override
     public void close() throws IOException {
+        flush();
+        try {
+            mongoDbBatchWriter.shutdown();
+        } catch (final MongoDbBatchWriterException e) {
+            throw new IOException("Error shutting down MongoDB batch writer", e);
+        }
     }
 
     @Override