You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/06/04 18:37:06 UTC

[49/50] [abbrv] git commit: Add implementation of server-side filtering

Add implementation of server-side filtering


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/8ac3bf51
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/8ac3bf51
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/8ac3bf51

Branch: refs/heads/master
Commit: 8ac3bf51d592c18928e9bf3c771e34be7196c5a6
Parents: ba1c5a8
Author: Damien Raude-Morvan <dr...@drazzib.com>
Authored: Wed Jun 4 09:08:49 2014 +0200
Committer: Damien Raude-Morvan <dr...@drazzib.com>
Committed: Wed Jun 4 09:08:49 2014 +0200

----------------------------------------------------------------------
 .../gora/mongodb/filters/BaseFactory.java       |  44 ++++++
 .../gora/mongodb/filters/DefaultFactory.java    | 153 +++++++++++++++++++
 .../gora/mongodb/filters/FilterFactory.java     |  42 +++++
 .../gora/mongodb/filters/MongoFilterUtil.java   | 119 +++++++++++++++
 .../apache/gora/mongodb/store/MongoMapping.java |   8 +-
 .../apache/gora/mongodb/store/MongoStore.java   |  26 +++-
 .../mongodb/filters/DefaultFactoryTest.java     | 138 +++++++++++++++++
 7 files changed, 521 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/8ac3bf51/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/BaseFactory.java
----------------------------------------------------------------------
diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/BaseFactory.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/BaseFactory.java
new file mode 100644
index 0000000..3dea131
--- /dev/null
+++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/BaseFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mongodb.filters;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+
+/**
+ * Base implementation of a
+ * {@link org.apache.gora.mongodb.filters.FilterFactory} which just manage back
+ * reference to {@link MongoFilterUtil}.
+ * 
+ * @author Damien Raude-Morvan <dr...@dictanova.com>
+ */
+public abstract class BaseFactory<K, T extends PersistentBase> implements
+    FilterFactory<K, T> {
+
+  private MongoFilterUtil<K, T> filterUtil;
+
+  @Override
+  public MongoFilterUtil<K, T> getFilterUtil() {
+    return filterUtil;
+  }
+
+  @Override
+  public void setFilterUtil(final MongoFilterUtil<K, T> util) {
+    this.filterUtil = util;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/8ac3bf51/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/DefaultFactory.java
----------------------------------------------------------------------
diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/DefaultFactory.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/DefaultFactory.java
new file mode 100644
index 0000000..54cbdfd
--- /dev/null
+++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/DefaultFactory.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mongodb.filters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.filter.*;
+import org.apache.gora.mongodb.store.MongoMapping;
+import org.apache.gora.mongodb.store.MongoStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.QueryBuilder;
+
+public class DefaultFactory<K, T extends PersistentBase> extends
+    BaseFactory<K, T> {
+  private static final Log LOG = LogFactory.getLog(DefaultFactory.class);
+
+  @Override
+  public List<String> getSupportedFilters() {
+    List<String> filters = new ArrayList<String>();
+    filters.add(SingleFieldValueFilter.class.getCanonicalName());
+    filters.add(MapFieldValueFilter.class.getCanonicalName());
+    filters.add(FilterList.class.getCanonicalName());
+    return filters;
+  }
+
+  @Override
+  public DBObject createFilter(final Filter<K, T> filter,
+      final MongoStore<K, T> store) {
+
+    if (filter instanceof FilterList) {
+      FilterList<K, T> filterList = (FilterList<K, T>) filter;
+      return transformListFilter(filterList, store);
+    } else if (filter instanceof SingleFieldValueFilter) {
+      SingleFieldValueFilter<K, T> fieldFilter = (SingleFieldValueFilter<K, T>) filter;
+      return transformFieldFilter(fieldFilter, store);
+    } else if (filter instanceof MapFieldValueFilter) {
+      MapFieldValueFilter<K, T> mapFilter = (MapFieldValueFilter<K, T>) filter;
+      return transformMapFilter(mapFilter, store);
+    } else {
+      LOG.warn("MongoDB remote filter not yet implemented for "
+          + filter.getClass().getCanonicalName());
+      return null;
+    }
+  }
+
+  protected DBObject transformListFilter(final FilterList<K, T> filterList,
+      final MongoStore<K, T> store) {
+    BasicDBObject query = new BasicDBObject();
+    for (Filter<K, T> filter : filterList.getFilters()) {
+      boolean succeeded = getFilterUtil().setFilter(query, filter, store);
+      if (!succeeded) {
+        return null;
+      }
+    }
+    return query;
+  }
+
+  protected DBObject transformFieldFilter(
+      final SingleFieldValueFilter<K, T> fieldFilter,
+      final MongoStore<K, T> store) {
+    MongoMapping mapping = store.getMapping();
+    String dbFieldName = mapping.getDocumentField(fieldFilter.getFieldName());
+
+    FilterOp filterOp = fieldFilter.getFilterOp();
+    List<Object> operands = fieldFilter.getOperands();
+
+    QueryBuilder builder = QueryBuilder.start(dbFieldName);
+    builder = appendToBuilder(builder, filterOp, operands);
+    if (!fieldFilter.isFilterIfMissing()) {
+      // If false, the find query will pass if the column is not found.
+      DBObject notExist = QueryBuilder.start(dbFieldName).exists(false).get();
+      builder = QueryBuilder.start().or(notExist, builder.get());
+    }
+    return builder.get();
+  }
+
+  protected DBObject transformMapFilter(
+      final MapFieldValueFilter<K, T> mapFilter, final MongoStore<K, T> store) {
+    MongoMapping mapping = store.getMapping();
+    String dbFieldName = mapping.getDocumentField(mapFilter.getFieldName())
+        + "." + store.encodeFieldKey(mapFilter.getMapKey().toString());
+
+    FilterOp filterOp = mapFilter.getFilterOp();
+    List<Object> operands = mapFilter.getOperands();
+
+    QueryBuilder builder = QueryBuilder.start(dbFieldName);
+    builder = appendToBuilder(builder, filterOp, operands);
+    if (!mapFilter.isFilterIfMissing()) {
+      // If false, the find query will pass if the column is not found.
+      DBObject notExist = QueryBuilder.start(dbFieldName).exists(false).get();
+      builder = QueryBuilder.start().or(notExist, builder.get());
+    }
+    return builder.get();
+  }
+
+  protected QueryBuilder appendToBuilder(final QueryBuilder builder,
+      final FilterOp filterOp, final List<Object> operands) {
+    switch (filterOp) {
+    case EQUALS:
+      if (operands.size() == 1) {
+        builder.is(operands.iterator().next());
+      } else {
+        builder.in(operands);
+      }
+      break;
+    case NOT_EQUALS:
+      if (operands.size() == 1) {
+        builder.notEquals(operands.iterator().next());
+      } else {
+        builder.notIn(operands);
+      }
+      break;
+    case LESS:
+      builder.lessThan(operands);
+      break;
+    case LESS_OR_EQUAL:
+      builder.lessThanEquals(operands);
+      break;
+    case GREATER:
+      builder.greaterThan(operands);
+      break;
+    case GREATER_OR_EQUAL:
+      builder.greaterThanEquals(operands);
+      break;
+    default:
+      throw new IllegalArgumentException(filterOp
+          + " no MongoDB equivalent yet");
+    }
+    return builder;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/8ac3bf51/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/FilterFactory.java
----------------------------------------------------------------------
diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/FilterFactory.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/FilterFactory.java
new file mode 100644
index 0000000..a2caabb
--- /dev/null
+++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/FilterFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mongodb.filters;
+
+import java.util.List;
+
+import org.apache.gora.filter.Filter;
+import org.apache.gora.mongodb.store.MongoStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+
+import com.mongodb.DBObject;
+
+/**
+ * Describe factory which create remote filter for MongoDB.
+ * 
+ * @author Damien Raude-Morvan <dr...@dictanova.com>
+ */
+public interface FilterFactory<K, T extends PersistentBase> {
+
+  MongoFilterUtil<K, T> getFilterUtil();
+
+  void setFilterUtil(MongoFilterUtil<K, T> util);
+
+  List<String> getSupportedFilters();
+
+  DBObject createFilter(Filter<K, T> filter, MongoStore<K, T> store);
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/8ac3bf51/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/MongoFilterUtil.java
----------------------------------------------------------------------
diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/MongoFilterUtil.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/MongoFilterUtil.java
new file mode 100644
index 0000000..1809db7
--- /dev/null
+++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/MongoFilterUtil.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mongodb.filters;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.filter.Filter;
+import org.apache.gora.mongodb.store.MongoStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.GoraException;
+import org.apache.gora.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.mongodb.DBObject;
+
+/**
+ * Manage creation of filtering {@link org.apache.gora.query.Query} using
+ * configured factories.
+ * <p>
+ * You can use <tt>{@value #MONGO_FILTER_FACTORIES_PARAMETER}</tt> parameter to
+ * change factories implementations used.
+ * </p>
+ * 
+ * @author Damien Raude-Morvan <dr...@dictanova.com>
+ * @see #setFilter(com.mongodb.DBObject, org.apache.gora.filter.Filter,
+ *      org.apache.gora.mongodb.store.MongoStore)
+ */
+public class MongoFilterUtil<K, T extends PersistentBase> {
+
+  /**
+   * Default implementation class for FilterFactory.
+   */
+  public static final String MONGO_FILTERS_DEFAULT_FACTORY = "org.apache.gora.mongodb.filters.DefaultFactory";
+
+  /**
+   * Configuration parameter which allow override of FilterFactory used.
+   */
+  public static final String MONGO_FILTER_FACTORIES_PARAMETER = "gora.mongodb.filter.factories";
+
+  /**
+   * Logger.
+   */
+  private static final Log LOG = LogFactory.getLog(MongoFilterUtil.class);
+
+  private Map<String, FilterFactory<K, T>> factories = new LinkedHashMap<String, FilterFactory<K, T>>();
+
+  public MongoFilterUtil(final Configuration conf) throws GoraException {
+    String[] factoryClassNames = conf.getStrings(
+        MONGO_FILTER_FACTORIES_PARAMETER, MONGO_FILTERS_DEFAULT_FACTORY);
+
+    for (String factoryClass : factoryClassNames) {
+      try {
+        FilterFactory<K, T> factory = (FilterFactory<K, T>) ReflectionUtils
+            .newInstance(factoryClass);
+        for (String filterClass : factory.getSupportedFilters()) {
+          factories.put(filterClass, factory);
+        }
+      } catch (Exception e) {
+        throw new GoraException(e);
+      }
+    }
+  }
+
+  public FilterFactory<K, T> getFactory(final Filter<K, T> filter) {
+    return factories.get(filter.getClass().getCanonicalName());
+  }
+
+  /**
+   * Set a filter on the <tt>query</tt>. It translates a Gora filter to a
+   * MongoDB filter.
+   * 
+   * @param query
+   *          The Mongo Query
+   * @param filter
+   *          The Gora filter.
+   * @param store
+   *          The MongoStore.
+   * @return if remote filter is successfully applied.
+   */
+  public boolean setFilter(final DBObject query, final Filter<K, T> filter,
+      final MongoStore<K, T> store) {
+
+    FilterFactory<K, T> factory = getFactory(filter);
+    if (factory == null) {
+      LOG.warn("MongoDB remote filter factory not yet implemented for "
+          + filter.getClass().getCanonicalName());
+      return false;
+    } else {
+      DBObject mongoFilter = factory.createFilter(filter, store);
+      if (mongoFilter == null) {
+        LOG.warn("MongoDB remote filter not yet implemented for "
+            + filter.getClass().getCanonicalName());
+        return false;
+      } else {
+        query.putAll(mongoFilter);
+        return true;
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/8ac3bf51/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoMapping.java
----------------------------------------------------------------------
diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoMapping.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoMapping.java
index f5d2a34..06162b1 100644
--- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoMapping.java
+++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoMapping.java
@@ -17,9 +17,7 @@
  */
 package org.apache.gora.mongodb.store;
 
-import static org.apache.gora.mongodb.store.MongoMapping.DocumentFieldType.DOCUMENT;
-import static org.apache.gora.mongodb.store.MongoMapping.DocumentFieldType.LIST;
-import static org.apache.gora.mongodb.store.MongoMapping.DocumentFieldType.valueOf;
+import static org.apache.gora.mongodb.store.MongoMapping.DocumentFieldType.*;
 
 import java.util.HashMap;
 import java.util.regex.Pattern;
@@ -150,10 +148,6 @@ public class MongoMapping {
     return validMongoDocumentField.matcher(f).matches();
   }
 
-  public void addDocumentField(String document, String field, String type) {
-    // Just ignore for now
-  }
-
   /**
    * Register a new mapping between a field from the persisted class to a
    * MongoDocument field.

http://git-wip-us.apache.org/repos/asf/gora/blob/8ac3bf51/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java
----------------------------------------------------------------------
diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java
index a971f49..f160b78 100644
--- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java
+++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java
@@ -33,6 +33,7 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.util.Utf8;
+import org.apache.gora.mongodb.filters.MongoFilterUtil;
 import org.apache.gora.mongodb.query.MongoDBQuery;
 import org.apache.gora.mongodb.query.MongoDBResult;
 import org.apache.gora.mongodb.utils.BSONDecorator;
@@ -122,6 +123,13 @@ public class MongoStore<K, T extends PersistentBase> extends
    */
   private MongoMapping mapping;
 
+  private MongoFilterUtil<K, T> filterUtil;
+
+  public MongoStore() {
+    // Create a default mapping that will be overriden in initialize method
+    this.mapping = new MongoMapping();
+  }
+
   /**
    * Initialize the data store by reading the credentials, setting the client's
    * properties up and reading the mapping file.
@@ -150,6 +158,8 @@ public class MongoStore<K, T extends PersistentBase> extends
       }
       super.initialize(keyClass, pPersistentClass, properties);
 
+      filterUtil = new MongoFilterUtil<K, T>(getConf());
+
       // Load the mapping
       MongoMappingBuilder<K, T> builder = new MongoMappingBuilder<K, T>(this);
       LOG.debug("Initializing Mongo store with mapping {}.",
@@ -247,6 +257,10 @@ public class MongoStore<K, T extends PersistentBase> extends
     }
   }
 
+  public MongoMapping getMapping() {
+    return mapping;
+  }
+
   /**
    * Accessor to the name of the collection used.
    */
@@ -447,6 +461,14 @@ public class MongoStore<K, T extends PersistentBase> extends
     DBObject q = MongoDBQuery.toDBQuery(query);
     DBObject p = MongoDBQuery.toProjection(fields, mapping);
 
+    if (query.getFilter() != null) {
+      boolean succeeded = filterUtil.setFilter(q, query.getFilter(), this);
+      if (succeeded) {
+        // don't need local filter
+        query.setLocalFilterEnabled(false);
+      }
+    }
+
     // Execute the query on the collection
     DBCursor cursor = mongoClientColl.find(q, p);
     if (query.getLimit() > 0)
@@ -993,7 +1015,7 @@ public class MongoStore<K, T extends PersistentBase> extends
    *          char with only dots.
    * @return encoded string with "\u00B7" chars..
    */
-  protected String encodeFieldKey(final String key) {
+  public String encodeFieldKey(final String key) {
     if (key == null) {
       return null;
     }
@@ -1007,7 +1029,7 @@ public class MongoStore<K, T extends PersistentBase> extends
    *          encoded string with "\u00B7" chars.
    * @return Cleanup up char with only dots.
    */
-  protected String decodeFieldKey(final String key) {
+  public String decodeFieldKey(final String key) {
     if (key == null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/gora/blob/8ac3bf51/gora-mongodb/src/test/java/org/apache/gora/mongodb/filters/DefaultFactoryTest.java
----------------------------------------------------------------------
diff --git a/gora-mongodb/src/test/java/org/apache/gora/mongodb/filters/DefaultFactoryTest.java b/gora-mongodb/src/test/java/org/apache/gora/mongodb/filters/DefaultFactoryTest.java
new file mode 100644
index 0000000..43631ef
--- /dev/null
+++ b/gora-mongodb/src/test/java/org/apache/gora/mongodb/filters/DefaultFactoryTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.mongodb.filters;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.filter.FilterList;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.MapFieldValueFilter;
+import org.apache.gora.filter.SingleFieldValueFilter;
+import org.apache.gora.mongodb.store.MongoStore;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.mongodb.DBObject;
+
+public class DefaultFactoryTest {
+
+  private FilterFactory<String, WebPage> filterFactory;
+  private MongoStore<String, WebPage> store;
+
+  @Before
+  public void setUp() throws Exception {
+    filterFactory = new DefaultFactory<String, WebPage>();
+    filterFactory.setFilterUtil(new MongoFilterUtil<String, WebPage>(
+        new Configuration()));
+
+    // Create dummy mapping for unit testing
+    store = new MongoStore<String, WebPage>();
+    store.getMapping().addClassField(null, "headers", "h", "document");
+    store.getMapping().addClassField(null, "url", "url", "string");
+  }
+
+  @Test
+  public void testCreateFilter_singleField_notEquals() throws Exception {
+    SingleFieldValueFilter<String, WebPage> filter = createUrlFilter();
+    filter.setFilterOp(FilterOp.NOT_EQUALS);
+    filter.setFilterIfMissing(true);
+
+    DBObject dbObject = filterFactory.createFilter(filter, store);
+    assertEquals("{ \"url\" : { \"$ne\" : \"http://www.example.com\"}}",
+        dbObject.toString());
+  }
+
+  @Test
+  public void testCreateFilter_singleField_equalsOrNull() throws Exception {
+    SingleFieldValueFilter<String, WebPage> filter = createUrlFilter();
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(false); // include doc with missing field
+
+    DBObject dbObject = filterFactory.createFilter(filter, store);
+    assertEquals(
+        "{ \"$or\" : [ { \"url\" : { \"$exists\" : false}} , { \"url\" : \"http://www.example.com\"}]}",
+        dbObject.toString());
+  }
+
+  @Test
+  public void testCreateFilter_mapField_notEquals() throws Exception {
+    MapFieldValueFilter<String, WebPage> filter = createHeadersFilter();
+    filter.setFilterOp(FilterOp.NOT_EQUALS);
+    filter.setFilterIfMissing(true);
+
+    DBObject dbObject = filterFactory.createFilter(filter, store);
+    assertEquals("{ \"h.C·T\" : { \"$ne\" : \"text/html\"}}",
+        dbObject.toString());
+  }
+
+  @Test
+  public void testCreateFilter_mapField_equalsOrNull() throws Exception {
+    MapFieldValueFilter<String, WebPage> filter = createHeadersFilter();
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(false); // include doc with missing field
+
+    DBObject dbObject = filterFactory.createFilter(filter, store);
+    assertEquals(
+        "{ \"$or\" : [ { \"h.C·T\" : { \"$exists\" : false}} , { \"h.C·T\" : \"text/html\"}]}",
+        dbObject.toString());
+  }
+
+  @Test
+  public void testCreateFilter_list_empty() throws Exception {
+    FilterList<String, WebPage> filter = new FilterList<String, WebPage>();
+
+    DBObject dbObject = filterFactory.createFilter(filter, store);
+    assertEquals("{ }", dbObject.toString());
+  }
+
+  @Test
+  public void testCreateFilter_list_2() throws Exception {
+    FilterList<String, WebPage> filter = new FilterList<String, WebPage>();
+    MapFieldValueFilter<String, WebPage> hFilter = createHeadersFilter();
+    hFilter.setFilterIfMissing(true);
+    hFilter.setFilterOp(FilterOp.EQUALS);
+    filter.addFilter(hFilter);
+    SingleFieldValueFilter<String, WebPage> urlFilter = createUrlFilter();
+    urlFilter.setFilterIfMissing(true);
+    urlFilter.setFilterOp(FilterOp.EQUALS);
+    filter.addFilter(urlFilter);
+
+    DBObject dbObject = filterFactory.createFilter(filter, store);
+    assertEquals(
+        "{ \"h.C·T\" : \"text/html\" , \"url\" : \"http://www.example.com\"}",
+        dbObject.toString());
+  }
+
+  private MapFieldValueFilter<String, WebPage> createHeadersFilter() {
+    MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String, WebPage>();
+    filter.setFieldName(WebPage.Field.HEADERS.toString());
+    filter.setMapKey(new Utf8("C.T"));
+    filter.getOperands().add("text/html");
+    return filter;
+  }
+
+  private SingleFieldValueFilter<String, WebPage> createUrlFilter() {
+    SingleFieldValueFilter<String, WebPage> filter = new SingleFieldValueFilter<String, WebPage>();
+    filter.setFieldName(WebPage.Field.URL.toString());
+    filter.getOperands().add("http://www.example.com");
+    return filter;
+  }
+}