You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/01/29 23:02:59 UTC

svn commit: r1562607 - in /gora/branches/GORA_94: ./ gora-core/src/main/java/org/apache/gora/filter/ gora-core/src/main/java/org/apache/gora/query/ gora-core/src/main/java/org/apache/gora/query/impl/ gora-core/src/main/java/org/apache/gora/query/ws/imp...

Author: lewismc
Date: Wed Jan 29 22:02:59 2014
New Revision: 1562607

URL: http://svn.apache.org/r1562607
Log:
GORA-119 implement a filter enabled scan in gora

Added:
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/Filter.java
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterList.java
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java
    gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/
    gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java
    gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java
    gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java
    gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java
    gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java
    gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java
Modified:
    gora/branches/GORA_94/CHANGES.txt
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/Query.java
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java
    gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java
    gora/branches/GORA_94/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
    gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java
    gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
    gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
    gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java

Modified: gora/branches/GORA_94/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/CHANGES.txt?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/CHANGES.txt (original)
+++ gora/branches/GORA_94/CHANGES.txt Wed Jan 29 22:02:59 2014
@@ -4,6 +4,8 @@
 
 Gora Change Log
 
+* GORA-119 implement a filter enabled scan in gora (ferdy, kturner, enis, Tien Nguyen Manh via lewismc)
+
 * GORA-231 Provide better error handling in AccumuloStore.readMapping when file does not exist. (apgiannakidis via lewismc)
 
 * GORA-283 Specify field name for types not being considered in gora-cassandra (lewismc)

Added: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/Filter.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/Filter.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/Filter.java (added)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/Filter.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Defines filtering (possibly including modification) of rows. By default
+ * all filtering is done client side. (In generic Gora classes). Datastore
+ * implementations can decide if they install remote filters, when possible.
+ *  
+ * @param <K>
+ * @param <T>
+ */
+public interface Filter<K, T extends Persistent> extends Writable{
+  
+  /**
+   * Filter the key and persistent. Modification is possible.
+   * 
+   * @param key
+   * @param persistent
+   * @return <code>true</code> if the row is filtered out (excluded), 
+   * <code>false</code> otherwise.
+   */
+  public boolean filter(K key, T persistent);
+}

Added: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterList.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterList.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterList.java (added)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterList.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.filter;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.ReflectionUtils;
+import org.apache.hadoop.io.Text;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FilterList<K, T extends PersistentBase> implements Filter<K, T> {
+  /** set operator */
+  public static enum Operator {
+    /** !AND */
+    MUST_PASS_ALL,
+    /** !OR */
+    MUST_PASS_ONE
+  }
+  
+  private Operator operator = Operator.MUST_PASS_ALL;
+  private List<Filter<K, T>> filters = new ArrayList<Filter<K, T>>();
+  
+  public FilterList() {
+  }
+  
+  public FilterList(final List<Filter<K, T>> rowFilters) {
+    this.filters = rowFilters;
+  }
+  
+  public FilterList(final Operator operator) {
+    this.operator = operator;
+  }
+  
+  public FilterList(final Operator operator, final List<Filter<K, T>> rowFilters) {
+    this.filters = rowFilters;
+    this.operator = operator;
+  }
+
+  public List<Filter<K, T>> getFilters() {
+    return filters;
+  }
+  
+  public Operator getOperator() {
+    return operator;
+  }
+  
+  public void addFilter(Filter<K, T> filter) {
+    this.filters.add(filter);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte opByte = in.readByte();
+    operator = Operator.values()[opByte];
+    int size = in.readInt();
+    if (size > 0) {
+      filters = new ArrayList<Filter<K, T>>(size);
+      try {
+        for (int i = 0; i < size; i++) {
+          @SuppressWarnings("unchecked")
+          Class<? extends Filter<K, T>> cls = (Class<? extends Filter<K, T>>) Class.forName(Text.readString(in)).asSubclass(Filter.class);
+          Filter<K, T> filter = ReflectionUtils.newInstance(cls);
+          filter.readFields(in);
+          filters.add(filter);
+        }
+      } catch (Exception e) {
+        throw (IOException)new IOException("Failed filter init").initCause(e);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeByte(operator.ordinal());
+    out.writeInt(filters.size());
+    for (Filter<K, T> filter : filters) {
+      Text.writeString(out, filter.getClass().getName());
+      filter.write(out);
+    }
+  }
+
+  @Override
+  public boolean filter(K key, T persistent) {
+    // TODO not yet implemented
+    return false;
+  }
+
+}

Added: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java (added)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+/**
+ * Defines a set of common filter compare operations.
+ */
+public enum FilterOp {
+  EQUALS,
+  NOT_EQUALS,
+  LESS,
+  LESS_OR_EQUAL,
+  GREATER,
+  GREATER_OR_EQUAL,
+}

Added: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java (added)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A filter that checks for a single field in the persistent.
+ * 
+ * @param <K>
+ * @param <T>
+ */
+public class MapFieldValueFilter<K, T extends PersistentBase> implements Filter<K, T> {
+
+  protected String fieldName;
+  protected Utf8 mapKey;
+  protected FilterOp filterOp;
+  protected List<Object> operands = new ArrayList<Object>();
+  protected boolean filterIfMissing = false;
+
+  private Configuration conf = new Configuration(); // just create empty conf,
+                                                    // needed for ObjectWritable
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, fieldName);
+    Text.writeString(out, mapKey.toString());
+    WritableUtils.writeEnum(out, filterOp);
+    WritableUtils.writeVInt(out, operands.size());
+    for (int i = 0; i < operands.size(); i++) {
+      Object operand = operands.get(i);
+      if (operand instanceof String) {
+        throw new IllegalStateException("Use Utf8 instead of String for operands");
+      }
+      if (operand instanceof Utf8) {
+        operand = operand.toString();
+      }
+      if (operand instanceof Boolean) {
+        ObjectWritable.writeObject(out, operand, Boolean.TYPE, conf);
+      } else if (operand instanceof Character) {
+        ObjectWritable.writeObject(out, operand, Character.TYPE, conf);
+      } else if (operand instanceof Byte) {
+        ObjectWritable.writeObject(out, operand, Byte.TYPE, conf);
+      } else if (operand instanceof Short) {
+        ObjectWritable.writeObject(out, operand, Short.TYPE, conf);
+      } else if (operand instanceof Integer) {
+        ObjectWritable.writeObject(out, operand, Integer.TYPE, conf);
+      } else if (operand instanceof Long) {
+        ObjectWritable.writeObject(out, operand, Long.TYPE, conf);
+      } else if (operand instanceof Float) {
+        ObjectWritable.writeObject(out, operand, Float.TYPE, conf);
+      } else if (operand instanceof Double) {
+        ObjectWritable.writeObject(out, operand, Double.TYPE, conf);
+      } else if (operand instanceof Void) {
+        ObjectWritable.writeObject(out, operand, Void.TYPE, conf);
+      } else {
+        ObjectWritable.writeObject(out, operand, operand.getClass(), conf);
+      }
+    }
+    out.writeBoolean(filterIfMissing);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    fieldName = Text.readString(in);
+    mapKey = new Utf8(Text.readString(in));
+    filterOp = WritableUtils.readEnum(in, FilterOp.class);
+    operands.clear();
+    int operandsSize = WritableUtils.readVInt(in);
+    for (int i = 0; i < operandsSize; i++) {
+      Object operand = ObjectWritable.readObject(in, conf);
+      if (operand instanceof String) {
+        operand = new Utf8((String) operand);
+      }
+      operands.add(operand);
+    }
+    filterIfMissing = in.readBoolean();
+  }
+
+  @Override
+  public boolean filter(K key, T persistent) {
+    int fieldIndex = persistent.getSchema().getField(fieldName).pos();
+    @SuppressWarnings("unchecked")
+    Map<Utf8, ?> fieldValue = (Map<Utf8, ?>) persistent.get(fieldIndex);
+    if (fieldValue == null) {
+      return filterIfMissing;
+    }
+    Object value = fieldValue.get(mapKey);
+    Object operand = operands.get(0);
+    if (value == null) {
+      return filterIfMissing;
+    }
+    if (filterOp.equals(FilterOp.EQUALS)) {
+      boolean equals = value.equals(operand);
+      return !equals;
+    } else if (filterOp.equals(FilterOp.NOT_EQUALS)) {
+      boolean equals = value.equals(operand);
+      return equals;
+    } else {
+      throw new IllegalStateException(filterOp + " not yet implemented");
+    }
+  }
+
+  public String getFieldName() {
+    return fieldName;
+  }
+
+  public void setFieldName(String fieldName) {
+    this.fieldName = fieldName;
+  }
+
+  public Utf8 getMapKey() {
+    return mapKey;
+  }
+
+  public void setMapKey(Utf8 mapKey) {
+    this.mapKey = mapKey;
+  }
+
+  public FilterOp getFilterOp() {
+    return filterOp;
+  }
+
+  public void setFilterOp(FilterOp filterOp) {
+    this.filterOp = filterOp;
+  }
+
+  public List<Object> getOperands() {
+    return operands;
+  }
+
+  public void setOperands(List<Object> operands) {
+    this.operands = operands;
+  }
+
+  public void setFilterIfMissing(boolean filterIfMissing) {
+    this.filterIfMissing = filterIfMissing;
+  }
+
+  public boolean isFilterIfMissing() {
+    return filterIfMissing;
+  }
+
+  @Override
+  public String toString() {
+    return "SingleFieldValueFilter [fieldName=" + fieldName + ",mapKey=" + mapKey + ", filterOp=" + filterOp + ", operands=" + operands
+        + ", filterIfMissing=" + filterIfMissing + "]";
+  }
+}

Added: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java (added)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A filter that checks for a single field in the persistent.
+ * 
+ * @param <K>
+ * @param <T>
+ */
+public class SingleFieldValueFilter<K, T extends PersistentBase> implements Filter<K, T>{
+
+  protected String fieldName;
+  protected FilterOp filterOp;
+  protected List<Object> operands = new ArrayList<Object>();
+  protected boolean filterIfMissing = false;
+
+  private Configuration conf = new Configuration(); //just create empty conf,
+                                                    //needed for ObjectWritable
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, fieldName);
+    WritableUtils.writeEnum(out, filterOp);
+    WritableUtils.writeVInt(out, operands.size());
+    for (int i = 0; i < operands.size(); i++) {
+      Object operand = operands.get(i);
+      if (operand instanceof String) {
+        throw new IllegalStateException("Use Utf8 instead of String for operands");
+      }
+      if (operand instanceof Utf8) {
+        operand=operand.toString();
+      }
+      if (operand instanceof Boolean) {
+        ObjectWritable.writeObject(out, operand, Boolean.TYPE, conf);
+      } else if (operand instanceof Character) {
+        ObjectWritable.writeObject(out, operand, Character.TYPE, conf);
+      } else if (operand instanceof Byte) {
+        ObjectWritable.writeObject(out, operand, Byte.TYPE, conf);
+      } else if (operand instanceof Short) {
+        ObjectWritable.writeObject(out, operand, Short.TYPE, conf);
+      } else if (operand instanceof Integer) {
+        ObjectWritable.writeObject(out, operand, Integer.TYPE, conf);
+      } else if (operand instanceof Long) {
+        ObjectWritable.writeObject(out, operand, Long.TYPE, conf);
+      } else if (operand instanceof Float) {
+        ObjectWritable.writeObject(out, operand, Float.TYPE, conf);
+      } else if (operand instanceof Double) {
+        ObjectWritable.writeObject(out, operand, Double.TYPE, conf);
+      } else if (operand instanceof Void) {
+        ObjectWritable.writeObject(out, operand, Void.TYPE, conf);
+      } else {
+        ObjectWritable.writeObject(out, operand, operand.getClass(), conf);
+      }
+    }
+    out.writeBoolean(filterIfMissing);
+  }  
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    fieldName = Text.readString(in);
+    filterOp = WritableUtils.readEnum(in, FilterOp.class);
+    operands.clear();
+    int operandsSize = WritableUtils.readVInt(in);
+    for (int i = 0; i < operandsSize; i++) {
+      Object operand = ObjectWritable.readObject(in, conf);
+      if (operand instanceof String) {
+        operand=new Utf8((String) operand);
+      }
+      operands.add(operand);
+    }
+    filterIfMissing = in.readBoolean();
+  }
+
+  @Override
+  public boolean filter(K key, T persistent) {
+    int fieldIndex = persistent.getSchema().getField(fieldName).pos(); //.getIndexNamed(fieldName); throws org.apache.avro.AvroRuntimeException: Not a union:
+    Object fieldValue = persistent.get(fieldIndex);
+    Object operand = operands.get(0);
+    if (fieldValue == null) {
+      return filterIfMissing;
+    }
+    if (filterOp.equals(FilterOp.EQUALS)) {
+      boolean equals = operand.equals(fieldValue);
+      return !equals;
+    } else if (filterOp.equals(FilterOp.NOT_EQUALS)) {
+      boolean equals = operand.equals(fieldValue);
+      return equals;
+    // TODO implement other FilterOp operators
+    } else {
+      throw new IllegalStateException(filterOp + " not yet implemented");
+    }
+  }
+  
+  public String getFieldName() {
+    return fieldName;
+  }
+  public void setFieldName(String fieldName) {
+    this.fieldName = fieldName;
+  }
+  
+  public FilterOp getFilterOp() {
+    return filterOp;
+  }
+  public void setFilterOp(FilterOp filterOp) {
+    this.filterOp = filterOp;
+  }
+
+  public List<Object> getOperands() {
+    return operands;
+  }
+  public void setOperands(List<Object> operands) {
+    this.operands = operands;
+  }
+  
+  public void setFilterIfMissing(boolean filterIfMissing) {
+    this.filterIfMissing = filterIfMissing;
+  }
+  
+  public boolean isFilterIfMissing() {
+    return filterIfMissing;
+  }
+
+  @Override
+  public String toString() {
+    return "SingleFieldValueFilter [fieldName=" + fieldName + ", filterOp="
+        + filterOp + ", operands=" + operands + ", filterIfMissing="
+        + filterIfMissing + "]";
+  }
+}

Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/Query.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/Query.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/Query.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/Query.java Wed Jan 29 22:02:59 2014
@@ -18,8 +18,7 @@
 
 package org.apache.gora.query;
 
-import java.io.IOException;
-
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.store.DataStore;
 
@@ -71,6 +70,30 @@ public interface Query<K, T extends Pers
   void setFields(String... fieldNames);
 
   String[] getFields();
+  
+  /**
+   * @param Set a filter on this query.
+   */
+  public void setFilter(Filter<K, T> filter);
+  
+  /**
+   * @return The filter on this query, or <code>null</code> if none.
+   */
+  public Filter<K, T> getFilter();
+  
+  /**
+   * Set whether the local filter is enabled. This is usually called by
+   * data store implementations that install the filter remotely
+   * (for efficiency reasons) and therefore disable the local filter.
+   * @param enable
+   */
+  void setLocalFilterEnabled(boolean enable);
+  
+  /**
+   * @return Whether the local filter is enabled.
+   * See {@link #setLocalFilterEnabled(boolean)}.
+   */
+  boolean isLocalFilterEnabled();
 
   /* Dimension : key */ 
   void setKey(K key);

Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java Wed Jan 29 22:02:59 2014
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -127,6 +128,16 @@ public String[] getLocations() {
   public void setLimit(long limit) {
     baseQuery.setLimit(limit);
   }
+  
+  @Override
+  public Filter<K, T> getFilter() {
+    return baseQuery.getFilter();
+  }
+  
+  @Override
+  public void setFilter(Filter<K, T> filter) {
+    baseQuery.setFilter(filter);
+  }
 
   @Override
   public void write(DataOutput out) throws IOException {

Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java Wed Jan 29 22:02:59 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
@@ -56,7 +57,8 @@ public abstract class QueryBase<K, T ext
   protected long startTime = -1;
   protected long endTime = -1;
 
-  protected String filter;
+  protected Filter<K, T> filter;
+  protected boolean localFilterEnabled=true;
 
   protected long limit = -1;
 
@@ -81,16 +83,6 @@ public abstract class QueryBase<K, T ext
     return dataStore;
   }
 
-//  @Override
-//  public void setQueryString(String queryString) {
-//    this.queryString = queryString;
-//  }
-//
-//  @Override
-//  public String getQueryString() {
-//    return queryString;
-//  }
-
   @Override
   public void setFields(String... fields) {
     this.fields = fields;
@@ -100,7 +92,27 @@ public abstract class QueryBase<K, T ext
 public String[] getFields() {
     return fields;
   }
-
+  
+  @Override
+  public Filter<K, T> getFilter() {
+    return filter;
+  }
+  
+  @Override
+  public void setFilter(Filter<K, T> filter) {
+    this.filter=filter;
+  }
+  
+  @Override
+  public boolean isLocalFilterEnabled() {
+    return localFilterEnabled;
+  }
+  
+  @Override
+  public void setLocalFilterEnabled(boolean enable) {
+    this.localFilterEnabled=enable;
+  }
+  
   @Override
   public void setKey(K key) {
     setKeyRange(key, key);
@@ -176,16 +188,6 @@ public String[] getFields() {
     return endTime;
   }
 
-//  @Override
-//  public void setFilter(String filter) {
-//    this.filter = filter;
-//  }
-//
-//  @Override
-//  public String getFilter() {
-//    return filter;
-//  }
-
   @Override
   public void setLimit(long limit) {
     this.limit = limit;
@@ -225,12 +227,20 @@ public String[] getFields() {
       startKey = IOUtils.deserialize(getConf(), in, null, dataStore.getKeyClass());
     if(!nullFields[3])
       endKey = IOUtils.deserialize(getConf(), in, null, dataStore.getKeyClass());
-    if(!nullFields[4])
-      filter = Text.readString(in);
+    if(!nullFields[4]) {
+      String filterClass = Text.readString(in);
+      try {
+        filter = (Filter<K, T>) ReflectionUtils.newInstance(ClassLoadingUtils.loadClass(filterClass), conf);
+        filter.readFields(in);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+    }
 
     startTime = WritableUtils.readVLong(in);
     endTime = WritableUtils.readVLong(in);
     limit = WritableUtils.readVLong(in);
+    localFilterEnabled = in.readBoolean(); 
   }
 
   //@Override
@@ -250,12 +260,15 @@ public String[] getFields() {
       IOUtils.serialize(getConf(), out, startKey, dataStore.getKeyClass());
     if(endKey != null)
       IOUtils.serialize(getConf(), out, endKey, dataStore.getKeyClass());
-    if(filter != null)
-      Text.writeString(out, filter);
+    if(filter != null) {
+      Text.writeString(out, filter.getClass().getCanonicalName());
+      filter.write(out);
+    }
 
     WritableUtils.writeVLong(out, getStartTime());
     WritableUtils.writeVLong(out, getEndTime());
     WritableUtils.writeVLong(out, getLimit());
+    out.writeBoolean(localFilterEnabled);
   }
 
   @SuppressWarnings({ "rawtypes" })
@@ -271,6 +284,7 @@ public String[] getFields() {
       builder.append(endKey, that.endKey);
       builder.append(filter, that.filter);
       builder.append(limit, that.limit);
+      builder.append(localFilterEnabled, that.localFilterEnabled);
       return builder.isEquals();
     }
     return false;
@@ -286,6 +300,7 @@ public String[] getFields() {
     builder.append(endKey);
     builder.append(filter);
     builder.append(limit);
+    builder.append(localFilterEnabled);
     return builder.toHashCode();
   }
 
@@ -298,6 +313,7 @@ public String[] getFields() {
     builder.append("endKey", endKey);
     builder.append("filter", filter);
     builder.append("limit", limit);
+    builder.append("localFilterEnabled", localFilterEnabled);
 
     return builder.toString();
   }

Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java Wed Jan 29 22:02:59 2014
@@ -18,13 +18,14 @@
 
 package org.apache.gora.query.impl;
 
-import java.io.IOException;
-
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
 
+import java.io.IOException;
+
 /**
  * Base class for {@link Result} implementations.
  */
@@ -102,17 +103,37 @@ public abstract class ResultBase<K, T ex
   
   @Override
   public final boolean next() throws Exception, IOException {
-	  if(isLimitReached()) {
-		  return false;
-	  }
-	    
-	  clear();
+    if(isLimitReached()) {
+      return false;
+    }
+      
+    boolean ret;
+    do {
+      clear();
+      persistent = getOrCreatePersistent(persistent);
+      ret = nextInner();
+      if (ret == false) {
+        //this is the end
+        break;
+      }
+      //we keep looping until we get a row that is not filtered out
+    } while (filter(key, persistent));
+    
+    if(ret) ++offset;
+    return ret;
+  }
+  
+  protected boolean filter(K key, T persistent) {
+    if (!query.isLocalFilterEnabled()) {
+      return false;
+    }
+    
+    Filter<K, T> filter = query.getFilter();
+    if (filter == null) {
+      return false;
+    }
     
-	  persistent = getOrCreatePersistent(persistent);
-	  boolean ret = nextInner();
-	  
-	  if(ret) ++offset;
-	  return ret;
+    return filter.filter(key, persistent);
   }
   
   @Override

Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java Wed Jan 29 22:02:59 2014
@@ -18,15 +18,16 @@
 
 package org.apache.gora.query.ws.impl;
 
-import java.util.Arrays;
-
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.store.DataStore;
 
+import java.util.Arrays;
+
 /**
- * Implementation for {@link PartitionQuery}.
+ * Webservices implementation for {@link PartitionQuery}.
  */
 //TODO this class should be reviewed when a web service backed datastore has the
 // ability to write partition queries
@@ -182,6 +183,27 @@ public class PartitionWSQueryImpl<K, T e
   public void setLimit(long limit) {
     baseQuery.setLimit(limit);
   }
+  
+  @Override
+  public Filter<K, T> getFilter() {
+    return filter;
+  }
+  
+  @Override
+  public void setFilter(Filter<K, T> filter) {
+    this.filter=filter;
+  }
+  
+  @Override
+  public boolean isLocalFilterEnabled() {
+    return localFilterEnabled;
+  }
+  
+  @Override
+  public void setLocalFilterEnabled(boolean enable) {
+    this.localFilterEnabled=enable;
+  }
+  
 
   @Override
   @SuppressWarnings({ "rawtypes" })

Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java Wed Jan 29 22:02:59 2014
@@ -21,6 +21,7 @@ package org.apache.gora.query.ws.impl;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
@@ -58,11 +59,8 @@ public abstract class QueryWSBase<K, T e
   protected long startTime = -1;
   protected long endTime = -1;
 
-  /**
-   * Query filter
-   */
-  protected String filter;
-
+  protected Filter<K, T> filter;
+  protected boolean localFilterEnabled=true;
   /**
    * Max number of results to be retrieved
    */

Added: gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java (added)
+++ gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import static org.junit.Assert.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.hadoop.io.WritableUtils;
+import org.junit.Test;
+
+public class TestMapFieldValueFilter {
+
+  @Test
+  public void testSerialization() throws IOException {
+    MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String, WebPage>();
+    //set filter field name as metadata
+    filter.setFieldName(WebPage.Field.METADATA.toString());
+    filter.setMapKey(new Utf8("fetchTime"));
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("http://example.org"));
+    byte[] byteArray = WritableUtils.toByteArray(filter);
+    MapFieldValueFilter<String, WebPage> filter2 = new MapFieldValueFilter<String, WebPage>();
+    filter2.readFields(new DataInputStream(new ByteArrayInputStream(byteArray)));
+    assertEquals(filter.getFieldName(), filter2.getFieldName());
+    assertEquals(filter.getMapKey(), filter2.getMapKey());
+    assertEquals(filter.getFilterOp(), filter2.getFilterOp());
+    assertArrayEquals(filter.getOperands().toArray(), filter2.getOperands().toArray());
+    assertEquals(filter.isFilterIfMissing(), filter2.isFilterIfMissing());
+  }
+  
+  @Test
+  public void testFilterBasics() {
+    MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String, WebPage>();
+    //set filter field name as outlinks
+    filter.setFieldName(WebPage.Field.OUTLINKS.toString());
+    filter.setMapKey(new Utf8("example"));
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("http://example.org"));
+    
+    WebPage page = WebPage.newBuilder().build();
+    page.getOutlinks().put(new Utf8("example"), new Utf8("http://example.org"));
+    assertFalse(filter.filter("irrelevant", page));
+    page.getOutlinks().put(new Utf8("example"), new Utf8("http://example2.com"));
+    assertTrue(filter.filter("irrelevant", page));
+    page = new WebPage();
+    assertTrue(filter.filter("irrelevant", page));
+    filter.setFilterIfMissing(false);
+    
+    assertFalse(filter.filter("irrelevant", page));
+  }
+  
+  @Test
+  public void testFilterEntryInMap() {
+    MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String, WebPage>();
+    //set filter field name as outlinks
+    filter.setFieldName(WebPage.Field.OUTLINKS.toString());
+    filter.setMapKey(new Utf8("foobar.whatever"));
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("Click here for foobar!"));
+    
+    WebPage page = WebPage.newBuilder().build();
+    assertTrue(filter.filter("irrelevant", page));
+    page.getOutlinks().put(new Utf8("foobar.whatever"), new Utf8("Mismatch!"));
+    assertTrue(filter.filter("irrelevant", page));
+    page.getOutlinks().put(new Utf8("foobar.whatever"), new Utf8("Click here for foobar!"));
+    assertFalse(filter.filter("irrelevant", page));
+  }
+
+}

Added: gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java (added)
+++ gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.hadoop.io.WritableUtils;
+import org.junit.Test;
+
+public class TestSingleFieldValueFilter {
+
+  @Test
+  public void testSerialization() throws IOException {
+    SingleFieldValueFilter<String, WebPage> filter = new SingleFieldValueFilter<String, WebPage>();
+    //set filter field to url
+    filter.setFieldName(WebPage.Field.URL.toString());
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("http://example.org"));
+    byte[] byteArray = WritableUtils.toByteArray(filter);
+    SingleFieldValueFilter<String, WebPage> filter2 = new SingleFieldValueFilter<String, WebPage>();
+    filter2.readFields(new DataInputStream(new ByteArrayInputStream(byteArray)));
+    assertEquals(filter.getFieldName(), filter2.getFieldName());
+    assertEquals(filter.getFilterOp(), filter2.getFilterOp());
+    assertArrayEquals(filter.getOperands().toArray(), filter2.getOperands().toArray());
+    assertEquals(filter.isFilterIfMissing(), filter2.isFilterIfMissing());
+  }
+  
+  @Test
+  public void testFilterBasics() {
+    SingleFieldValueFilter<String, WebPage> filter = new SingleFieldValueFilter<String, WebPage>();
+    //set filter field to url
+    filter.setFieldName(WebPage.Field.URL.toString());
+    filter.setFilterOp(FilterOp.EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("example.org"));
+    
+    WebPage page = WebPage.newBuilder().build();
+    page.setUrl(new Utf8("example.org"));
+    assertFalse(filter.filter("irrelevant", page));
+    page.setUrl(new Utf8("mismatch.whatever"));
+    assertTrue(filter.filter("irrelevant", page));
+    page = new WebPage();
+    assertTrue(filter.filter("irrelevant", page));
+    
+    filter.setFilterIfMissing(false);
+    
+    assertFalse(filter.filter("irrelevant", page));
+  }
+  
+  @Test
+  public void testFilterInequals() {
+    SingleFieldValueFilter<String, WebPage> filter = new SingleFieldValueFilter<String, WebPage>();
+    //set filter field to url
+    filter.setFieldName(WebPage.Field.URL.toString());
+    filter.setFilterOp(FilterOp.NOT_EQUALS);
+    filter.setFilterIfMissing(true);
+    filter.getOperands().add(new Utf8("example.org"));
+    
+    WebPage page = WebPage.newBuilder().build();
+    page.setUrl(new Utf8("example.org"));
+    assertTrue(filter.filter("irrelevant", page));
+    
+    page.setUrl(new Utf8("something.else"));
+    assertFalse(filter.filter("irrelevant", page));
+  }
+  
+}

Modified: gora/branches/GORA_94/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java (original)
+++ gora/branches/GORA_94/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java Wed Jan 29 22:02:59 2014
@@ -21,6 +21,7 @@ package org.apache.gora.dynamodb.query;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.ws.impl.QueryWSBase;
@@ -377,4 +378,28 @@ public class DynamoDBQuery<K, T extends 
   public static void setRangeCompOp(ComparisonOperator pRangeCompOp){
     rangeCompOp = pRangeCompOp;
   }
+
+  @Override
+  public void setFilter(Filter<K, T> filter) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public Filter<K, T> getFilter() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void setLocalFilterEnabled(boolean enable) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public boolean isLocalFilterEnabled() {
+    // TODO Auto-generated method stub
+    return false;
+  }
 }

Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java Wed Jan 29 22:02:59 2014
@@ -18,7 +18,6 @@
 
 package org.apache.gora.hbase.query;
 
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.QueryBase;

Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java Wed Jan 29 22:02:59 2014
@@ -22,7 +22,7 @@ import java.util.Arrays;
 /**
  * Store family, qualifier tuple 
  */
-class HBaseColumn {
+public class HBaseColumn {
   
   final byte[] family;
   final byte[] qualifier;

Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Wed Jan 29 22:02:59 2014
@@ -42,6 +42,7 @@ import org.apache.gora.hbase.query.HBase
 import org.apache.gora.hbase.query.HBaseScannerResult;
 import org.apache.gora.hbase.store.HBaseMapping.HBaseMappingBuilder;
 import org.apache.gora.hbase.util.HBaseByteInterface;
+import org.apache.gora.hbase.util.HBaseFilterUtil;
 import org.apache.gora.persistency.impl.DirtyListWrapper;
 import org.apache.gora.persistency.impl.DirtyMapWrapper;
 import org.apache.gora.persistency.impl.PersistentBase;
@@ -98,6 +99,8 @@ implements Configurable {
   private final boolean autoCreateSchema = true;
 
   private volatile HBaseMapping mapping;
+  
+  private HBaseFilterUtil<K, T> filterUtil;
 
   private int scannerCaching = SCANNER_CACHING_PROPERTIES_DEFAULT ;
   
@@ -113,7 +116,7 @@ implements Configurable {
       this.conf = HBaseConfiguration.create(getConf());
       admin = new HBaseAdmin(this.conf);
       mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
-      
+      filterUtil = new HBaseFilterUtil<K, T>(this.conf);
     } catch (FileNotFoundException ex) {
       try {
         mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEPRECATED_MAPPING_FILE));
@@ -160,6 +163,10 @@ implements Configurable {
     //return the name of this table
     return mapping.getTableName();
   }
+  
+  public HBaseMapping getMapping() {
+    return mapping;
+  }
 
   @Override
   public void createSchema() {
@@ -387,8 +394,7 @@ implements Configurable {
     }
     List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>(keys.getFirst().length);
     for (int i = 0; i < keys.getFirst().length; i++) {
-      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
-      getServerAddress().getHostname();
+      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).getServerAddress().getHostname();
       byte[] startRow = query.getStartKey() != null ? toBytes(query.getStartKey())
           : HConstants.EMPTY_START_ROW;
       byte[] stopRow = query.getEndKey() != null ? toBytes(query.getEndKey())
@@ -440,7 +446,7 @@ implements Configurable {
         ResultScanner scanner = createScanner(query);
   
         org.apache.gora.query.Result<K,T> result
-            = new HBaseScannerResult<K,T>(this,query, scanner);
+            = new HBaseScannerResult<K,T>(this, query, scanner);
   
         return result;
       }
@@ -463,6 +469,13 @@ implements Configurable {
       scan.setStopRow(toBytes(query.getEndKey()));
     }
     addFields(scan, query);
+    if (query.getFilter() != null) {
+      boolean succeeded = filterUtil.setFilter(scan, query.getFilter(), this);
+      if (succeeded) {
+        // don't need local filter
+        query.setLocalFilterEnabled(false);
+      }
+    }
 
     return table.getScanner(scan);
   }

Added: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java (added)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hbase.util;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+
+public abstract class BaseFactory<K, T extends PersistentBase> implements FilterFactory<K, T> {
+
+  private HBaseFilterUtil<K, T> util;
+
+  @Override
+  public HBaseFilterUtil<K, T> getHbaseFitlerUtil() {
+    return util;
+  }
+
+  @Override
+  public void setHBaseFitlerUtil(HBaseFilterUtil<K, T> util) {
+    this.util = util;
+  }
+  
+}

Added: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java (added)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hbase.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.filter.FilterList;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.Filter;
+import org.apache.gora.filter.MapFieldValueFilter;
+import org.apache.gora.filter.SingleFieldValueFilter;
+import org.apache.gora.hbase.store.HBaseColumn;
+import org.apache.gora.hbase.store.HBaseStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DefaultFactory <K, T extends PersistentBase> extends BaseFactory<K, T> {
+  private static final Log LOG = LogFactory.getLog(DefaultFactory.class);
+  
+  @Override
+  public List<String> getSupportedFilters() {
+    List<String> filters = new ArrayList<String>();
+    filters.add(SingleFieldValueFilter.class.getCanonicalName());
+    filters.add(MapFieldValueFilter.class.getCanonicalName());
+    filters.add(FilterList.class.getCanonicalName());
+    return filters;
+  }
+  
+  @Override
+  public org.apache.hadoop.hbase.filter.Filter createFilter(Filter<K, T> filter, HBaseStore<K, T> store) {
+    if (filter instanceof FilterList) {
+      FilterList<K, T> filterList = (FilterList<K, T>) filter;
+      org.apache.hadoop.hbase.filter.FilterList hbaseFilter = new org.apache.hadoop.hbase.filter.FilterList(
+          Operator.valueOf(filterList.getOperator().name()));
+      for (Filter<K, T> rowFitler : filterList.getFilters()) {
+        FilterFactory<K, T> factory = getHbaseFitlerUtil().getFactory(rowFitler);
+        if (factory == null) {
+          LOG.warn("HBase remote filter factory not yet implemented for " + rowFitler.getClass().getCanonicalName());
+          return null;
+        }
+        org.apache.hadoop.hbase.filter.Filter hbaseRowFilter = factory.createFilter(rowFitler, store);
+        if (hbaseRowFilter != null) {
+          hbaseFilter.addFilter(hbaseRowFilter);
+        }
+      }
+      return hbaseFilter;
+    } else if (filter instanceof SingleFieldValueFilter) {
+      SingleFieldValueFilter<K, T> fieldFilter = (SingleFieldValueFilter<K, T>) filter;
+
+      HBaseColumn column = store.getMapping().getColumn(fieldFilter.getFieldName());
+      CompareOp compareOp = getCompareOp(fieldFilter.getFilterOp());
+      byte[] family = column.getFamily();
+      byte[] qualifier = column.getQualifier();
+      byte[] value = HBaseByteInterface.toBytes(fieldFilter.getOperands().get(0));
+      SingleColumnValueFilter hbaseFilter = new SingleColumnValueFilter(family, qualifier, compareOp, value);
+      hbaseFilter.setFilterIfMissing(fieldFilter.isFilterIfMissing());
+
+      return hbaseFilter;
+    } else if (filter instanceof MapFieldValueFilter) {
+      MapFieldValueFilter<K, T> mapFilter = (MapFieldValueFilter<K, T>) filter;
+
+      HBaseColumn column = store.getMapping().getColumn(mapFilter.getFieldName());
+      CompareOp compareOp = getCompareOp(mapFilter.getFilterOp());
+      byte[] family = column.getFamily();
+      byte[] qualifier = HBaseByteInterface.toBytes(mapFilter.getMapKey());
+      byte[] value = HBaseByteInterface.toBytes(mapFilter.getOperands().get(0));
+      SingleColumnValueFilter hbaseFilter = new SingleColumnValueFilter(family, qualifier, compareOp, value);
+      hbaseFilter.setFilterIfMissing(mapFilter.isFilterIfMissing());
+
+      return hbaseFilter;
+    } else {
+      LOG.warn("HBase remote filter not yet implemented for " + filter.getClass().getCanonicalName());
+      return null;
+    }
+  }
+
+  private CompareOp getCompareOp(FilterOp filterOp) {
+    switch (filterOp) {
+      case EQUALS:
+        return CompareOp.EQUAL;
+      case NOT_EQUALS:
+        return CompareOp.NOT_EQUAL;
+      case LESS:
+        return CompareOp.LESS;
+      case LESS_OR_EQUAL:
+        return CompareOp.LESS_OR_EQUAL;
+      case GREATER:
+        return CompareOp.GREATER;
+      case GREATER_OR_EQUAL:
+        return CompareOp.GREATER_OR_EQUAL;
+      default:
+        throw new IllegalArgumentException(filterOp + " no HBase equivalent yet");
+    }
+  }
+
+
+}

Added: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java (added)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hbase.util;
+
+import org.apache.gora.filter.Filter;
+import org.apache.gora.hbase.store.HBaseStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+
+import java.util.List;
+
+public interface FilterFactory <K, T extends PersistentBase> {
+  void setHBaseFitlerUtil(HBaseFilterUtil<K, T> util);
+  HBaseFilterUtil<K, T> getHbaseFitlerUtil();
+  List<String> getSupportedFilters();
+  org.apache.hadoop.hbase.filter.Filter createFilter(Filter<K, T> filter, HBaseStore<K, T> store);
+}

Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Wed Jan 29 22:02:59 2014
@@ -205,6 +205,8 @@ public class HBaseByteInterface {
       return Bytes.toBytes((String) o);
     } else if (clazz.equals(Utf8.class)) {
       return ((Utf8) o).getBytes();
+    } else if (clazz.isArray() && clazz.getComponentType().equals(Byte.TYPE)) {
+      return (byte[])o;
     }
     throw new RuntimeException("Can't parse data as class: " + clazz);
   }

Added: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java (added)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.hbase.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.filter.Filter;
+import org.apache.gora.hbase.store.HBaseStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.GoraException;
+import org.apache.gora.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class HBaseFilterUtil<K, T extends PersistentBase> {
+  private static final Log LOG = LogFactory.getLog(HBaseFilterUtil.class);
+
+  private Map<String, FilterFactory<K, T>> factories = new LinkedHashMap<String, FilterFactory<K, T>>();
+
+  public HBaseFilterUtil(Configuration conf) throws GoraException {
+    String[] factoryClassNames = conf.getStrings("gora.hbase.filter.factories", "org.apache.gora.hbase.util.DefaultFactory");
+
+    for (String factoryClass : factoryClassNames) {
+      try {
+        @SuppressWarnings("unchecked")
+        FilterFactory<K, T> factory = (FilterFactory<K, T>) ReflectionUtils.newInstance(factoryClass);
+        for (String filterClass : factory.getSupportedFilters()) {
+          factories.put(filterClass, factory);
+        }
+        factory.setHBaseFitlerUtil(this);
+      } catch (Exception e) {
+        throw new GoraException(e);
+      }
+    }
+  }
+
+  public FilterFactory<K, T> getFactory(Filter<K, T> fitler) {
+    return factories.get(fitler.getClass().getCanonicalName());
+  }
+
+  /**
+   * Set a filter on the Scan. It translates a Gora filter to a HBase filter.
+   * 
+   * @param scan
+   * @param filter
+   *          The Gora filter.
+   * @param store
+   *          The HBaseStore.
+   * @return if remote filter is succesfully applied.
+   */
+  public boolean setFilter(Scan scan, Filter<K, T> filter, HBaseStore<K, T> store) {
+
+    FilterFactory<K, T> factory = getFactory(filter);
+    if (factory != null) {
+      org.apache.hadoop.hbase.filter.Filter hbaseFilter = factory.createFilter(filter, store);
+      if (hbaseFilter != null) {
+        scan.setFilter(hbaseFilter);
+        return true;
+      } else {
+        LOG.warn("HBase remote filter not yet implemented for " + filter.getClass().getCanonicalName());
+        return false;
+      }
+    } else {
+      LOG.warn("HBase remote filter factory not yet implemented for " + filter.getClass().getCanonicalName());
+      return false;
+    }
+  }
+  
+}