You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by mi...@apache.org on 2017/10/24 13:40:33 UTC
incubator-rya git commit: RYA-393 Fixed MongoDB DAO Batch Writer from
dropping statements. Closes #236
Repository: incubator-rya
Updated Branches:
refs/heads/master 853e0eea7 -> b372ebcdb
RYA-393 Fixed MongoDB DAO Batch Writer from dropping statements. Closes #236
Fixed secondary indexers so they flush when the DAO flushes. Indexers now close when DAO closes. Fixed daemon thread in MongoDB batch writer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/b372ebcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/b372ebcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/b372ebcd
Branch: refs/heads/master
Commit: b372ebcdb43d19315b7d430ae55fedf45654245c
Parents: 853e0ee
Author: eric.white <Er...@parsons.com>
Authored: Wed Oct 4 11:21:06 2017 -0400
Committer: Aaron Mihalik <aa...@gmail.com>
Committed: Tue Oct 24 09:39:13 2017 -0400
----------------------------------------------------------------------
.../org/apache/rya/accumulo/AccumuloRyaDAO.java | 11 ++
.../org/apache/rya/mongodb/MongoDBRyaDAO.java | 18 ++
.../rya/mongodb/batch/MongoDbBatchWriter.java | 11 +-
.../batch/collection/MongoCollectionType.java | 3 +-
.../rya/mongodb/MongoDBRyaBatchWriterIT.java | 168 +++++++++++++++++++
.../org/apache/rya/mongodb/MongoDBRyaDAOIT.java | 8 +-
.../org/apache/rya/mongodb/MongoTestBase.java | 17 ++
.../indexing/mongodb/AbstractMongoIndexer.java | 6 +
8 files changed, 239 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
index 8c99e44..a8350d9 100644
--- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
+++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java
@@ -472,11 +472,22 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName
public void flush() throws RyaDAOException {
try {
mt_bw.flush();
+ flushIndexers();
} catch (final MutationsRejectedException e) {
throw new RyaDAOException(e);
}
}
+ private void flushIndexers() throws RyaDAOException {
+ for (final AccumuloIndexer indexer : secondaryIndexers) {
+ try {
+ indexer.flush();
+ } catch (final IOException e) {
+ logger.error("Error flushing data in indexer: " + indexer.getClass().getSimpleName(), e);
+ }
+ }
+ }
+
protected String[] getTables() {
// core tables
final List<String> tableNames = Lists.newArrayList(
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
index fe0f6f9..d263b9c 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java
@@ -171,6 +171,13 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
} catch (final MongoDbBatchWriterException e) {
throw new RyaDAOException("Error shutting down MongoDB batch writer", e);
}
+ for(final MongoSecondaryIndex indexer : secondaryIndexers) {
+ try {
+ indexer.close();
+ } catch (final IOException e) {
+ log.error("Error closing indexer: " + indexer.getClass().getSimpleName(), e);
+ }
+ }
if (mongoClient != null) {
mongoClient.close();
}
@@ -314,8 +321,19 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{
public void flush() throws RyaDAOException {
try {
mongoDbBatchWriter.flush();
+ flushIndexers();
} catch (final MongoDbBatchWriterException e) {
throw new RyaDAOException("Error flushing data.", e);
}
}
+
+ private void flushIndexers() throws RyaDAOException {
+ for (final MongoSecondaryIndex indexer : secondaryIndexers) {
+ try {
+ indexer.flush();
+ } catch (final IOException e) {
+ log.error("Error flushing data in indexer: " + indexer.getClass().getSimpleName(), e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
index 2f52b5c..b609276 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java
@@ -35,6 +35,8 @@ import org.apache.log4j.Logger;
import org.apache.rya.mongodb.batch.collection.CollectionType;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.mongodb.DuplicateKeyException;
+import com.mongodb.MongoBulkWriteException;
/**
* Handles batch writing MongoDB statement objects to the repository. It takes
@@ -95,7 +97,6 @@ public class MongoDbBatchWriter<T> {
private static final ThreadFactory QUEUE_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNameFormat("Queue Full Checker Thread - %d")
- .setDaemon(true)
.build();
/**
@@ -218,6 +219,14 @@ public class MongoDbBatchWriter<T> {
if (!batch.isEmpty()) {
collectionType.insertMany(batch);
}
+ } catch (final DuplicateKeyException e) {
+ log.warn(e); // Suppress the stack trace so log doesn't get flooded.
+ } catch (final MongoBulkWriteException e) {
+ if (e.getMessage().contains("duplicate key error")) {
+ log.warn(e); // Suppress the stack trace so log doesn't get flooded.
+ } else {
+ throw new MongoDbBatchWriterException("Error flushing statements", e);
+ }
} catch (final Exception e) {
throw new MongoDbBatchWriterException("Error flushing statements", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
index 8fb796a..11f2dc1 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.bson.Document;
import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.InsertManyOptions;
/**
* Provides access to the {@link MongoCollection} type.
@@ -47,6 +48,6 @@ public class MongoCollectionType implements CollectionType<Document> {
@Override
public void insertMany(final List<Document> items) {
- collection.insertMany(items);
+ collection.insertMany(items, new InsertManyOptions().ordered(false));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java
new file mode 100644
index 0000000..68bbc27
--- /dev/null
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaStatement.RyaStatementBuilder;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
+import org.apache.rya.mongodb.batch.collection.DbCollectionType;
+import org.apache.rya.mongodb.batch.collection.MongoCollectionType;
+import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
+import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+import org.bson.Document;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+
+/**
+ * Integration tests for the {@link MongoDbBatchWriter}.
+ */
+public class MongoDBRyaBatchWriterIT extends MongoTestBase {
+ private MongoDBRyaDAO dao;
+
+ private static void setupLogging() {
+ BasicConfigurator.configure();
+ }
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ setupLogging();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ conf.setBoolean("rya.mongodb.dao.flusheachupdate", false);
+ conf.setInt("rya.mongodb.dao.batchwriter.size", 50_000);
+ conf.setLong("rya.mongodb.dao.batchwriter.flushtime", 100L);
+
+ final MongoClient client = super.getMongoClient();
+ dao = new MongoDBRyaDAO(conf, client);
+ }
+
+ @Test
+ public void testDuplicateKeys() throws Exception {
+ final List<RyaStatement> statements = new ArrayList<>();
+ statements.add(statement(1));
+ statements.add(statement(2));
+ statements.add(statement(1));
+ statements.add(statement(3));
+ statements.add(statement(1));
+ statements.add(statement(4));
+ statements.add(statement(1));
+ statements.add(statement(5));
+ statements.add(statement(1));
+ statements.add(statement(6));
+
+ dao.add(statements.iterator());
+
+ dao.flush();
+
+ Assert.assertEquals(6, getRyaCollection().count());
+ }
+
+ @Test
+ public void testDbCollectionFlush() throws Exception {
+ final MongoDBStorageStrategy<RyaStatement> storageStrategy = new SimpleMongoDBStorageStrategy();
+
+ final List<DBObject> objects = Lists.newArrayList(
+ storageStrategy.serialize(statement(1)),
+ storageStrategy.serialize(statement(2)),
+ storageStrategy.serialize(statement(2)),
+ null,
+ storageStrategy.serialize(statement(3)),
+ storageStrategy.serialize(statement(3)),
+ storageStrategy.serialize(statement(4))
+ );
+
+ final DbCollectionType collectionType = new DbCollectionType(getRyaDbCollection());
+ final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
+ final MongoDbBatchWriter<DBObject> mongoDbBatchWriter = new MongoDbBatchWriter<DBObject>(collectionType, mongoDbBatchWriterConfig);
+
+ mongoDbBatchWriter.start();
+ mongoDbBatchWriter.addObjectsToQueue(objects);
+ mongoDbBatchWriter.flush();
+ Thread.sleep(1_000);
+ mongoDbBatchWriter.addObjectsToQueue(objects);
+ mongoDbBatchWriter.flush();
+ Thread.sleep(1_000);
+ mongoDbBatchWriter.shutdown();
+ Assert.assertEquals(4, getRyaDbCollection().count());
+ }
+
+ @Test
+ public void testMongoCollectionFlush() throws Exception {
+ final MongoDBStorageStrategy<RyaStatement> storageStrategy = new SimpleMongoDBStorageStrategy();
+
+ final List<Document> documents = Lists.newArrayList(
+ toDocument(storageStrategy.serialize(statement(1))),
+ toDocument(storageStrategy.serialize(statement(2))),
+ toDocument(storageStrategy.serialize(statement(2))),
+ null,
+ toDocument(storageStrategy.serialize(statement(3))),
+ toDocument(storageStrategy.serialize(statement(3))),
+ toDocument(storageStrategy.serialize(statement(4)))
+ );
+
+ final MongoCollectionType mongoCollectionType = new MongoCollectionType(getRyaCollection());
+ final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf);
+ final MongoDbBatchWriter<Document> mongoDbBatchWriter = new MongoDbBatchWriter<Document>(mongoCollectionType, mongoDbBatchWriterConfig);
+
+ mongoDbBatchWriter.start();
+ mongoDbBatchWriter.addObjectsToQueue(documents);
+ mongoDbBatchWriter.flush();
+ Thread.sleep(1_000);
+ mongoDbBatchWriter.addObjectsToQueue(documents);
+ mongoDbBatchWriter.flush();
+ Thread.sleep(1_000);
+ mongoDbBatchWriter.shutdown();
+ Assert.assertEquals(4, getRyaCollection().count());
+ }
+
+ private static Document toDocument(final DBObject dbObject) {
+ if (dbObject == null) {
+ return null;
+ }
+ final Document document = Document.parse(dbObject.toString());
+ return document;
+ }
+
+ private static RyaURI ryaURI(final int v) {
+ return new RyaURI("u:" + v);
+ }
+
+ private static RyaStatement statement(final int v) {
+ final RyaStatementBuilder builder = new RyaStatementBuilder();
+ builder.setPredicate(ryaURI(v));
+ builder.setSubject(ryaURI(v));
+ builder.setObject(ryaURI(v));
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java
index 5f3605e..a014e8f 100644
--- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java
@@ -36,6 +36,7 @@ import org.apache.rya.mongodb.document.util.AuthorizationsUtil;
import org.apache.rya.mongodb.document.visibility.DocumentVisibility;
import org.bson.Document;
import org.calrissian.mango.collect.CloseableIterable;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
@@ -46,7 +47,7 @@ import com.mongodb.client.MongoDatabase;
public class MongoDBRyaDAOIT extends MongoTestBase {
private MongoClient client;
- private MongoDBRyaDAO dao;
+ private static MongoDBRyaDAO dao;
@Before
public void setUp() throws IOException, RyaDAOException{
@@ -55,6 +56,11 @@ public class MongoDBRyaDAOIT extends MongoTestBase {
dao = new MongoDBRyaDAO(conf, client);
}
+ @AfterClass
+ public static void tearDown() throws RyaDAOException {
+ dao.destroy();
+ }
+
@Test
public void testDeleteWildcard() throws RyaDAOException {
final RyaStatementBuilder builder = new RyaStatementBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java
----------------------------------------------------------------------
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java
index ffd4fd9..e325e82 100644
--- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java
@@ -19,10 +19,13 @@
package org.apache.rya.mongodb;
import org.apache.hadoop.conf.Configuration;
+import org.bson.Document;
import org.junit.After;
import org.junit.Before;
+import com.mongodb.DBCollection;
import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
/**
* A base class that may be used when implementing Mongo DB tests that use the
@@ -57,4 +60,18 @@ public class MongoTestBase {
public MongoClient getMongoClient() {
return mongoClient;
}
+
+ /**
+ * @return The Rya triples {@link MongoCollection}.
+ */
+ public MongoCollection<Document> getRyaCollection() {
+ return mongoClient.getDatabase(conf.getMongoDBName()).getCollection(conf.getTriplesCollectionName());
+ }
+
+ /**
+ * @return The Rya triples {@link DBCollection}.
+ */
+ public DBCollection getRyaDbCollection() {
+ return mongoClient.getDB(conf.getMongoDBName()).getCollection(conf.getTriplesCollectionName());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
index f8ab40f..9ce6e22 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
@@ -121,6 +121,12 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat
@Override
public void close() throws IOException {
+ flush();
+ try {
+ mongoDbBatchWriter.shutdown();
+ } catch (final MongoDbBatchWriterException e) {
+ throw new IOException("Error shutting down MongoDB batch writer", e);
+ }
}
@Override