You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/01/22 17:27:19 UTC

svn commit: r1560407 - in /gora/trunk: ./ gora-core/src/main/java/org/apache/gora/query/ gora-core/src/main/java/org/apache/gora/query/impl/ gora-core/src/main/java/org/apache/gora/query/ws/impl/ gora-dynamodb/src/main/java/org/apache/gora/dynamodb/que...

Author: lewismc
Date: Wed Jan 22 16:27:18 2014
New Revision: 1560407

URL: http://svn.apache.org/r1560407
Log:
GORA-119 

Modified:
    gora/trunk/CHANGES.txt
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/Query.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java
    gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java
    gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
    gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java

Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Wed Jan 22 16:27:18 2014
@@ -4,6 +4,8 @@
 
 Gora Change Log
 
+* GORA-119 implement a filter enabled scan in gora (ferdy, kturner, enis, Tien Nguyen Manh via lewismc)
+
 * GORA-290 StatefulHashMap removes the entry when put with same value (Alparslan Avci via hsaputra)
 
 * GORA-231 Provide better error handling in AccumuloStore.readMapping when file does not exist (Apostolos Giannakidis)

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/Query.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/Query.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/Query.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/Query.java Wed Jan 22 16:27:18 2014
@@ -18,8 +18,7 @@
 
 package org.apache.gora.query;
 
-import java.io.IOException;
-
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.store.DataStore;
 
@@ -71,6 +70,30 @@ public interface Query<K, T extends Pers
   void setFields(String... fieldNames);
 
   String[] getFields();
+  
+  /**
+   * @param Set a filter on this query.
+   */
+  public void setFilter(Filter<K, T> filter);
+  
+  /**
+   * @return The filter on this query, or <code>null</code> if none.
+   */
+  public Filter<K, T> getFilter();
+  
+  /**
+   * Set whether the local filter is enabled. This is usually called by
+   * data store implementations that install the filter remotely
+   * (for efficiency reasons) and therefore disable the local filter.
+   * @param enable
+   */
+  void setLocalFilterEnabled(boolean enable);
+  
+  /**
+   * @return Whether the local filter is enabled.
+   * See {@link #setLocalFilterEnabled(boolean)}.
+   */
+  boolean isLocalFilterEnabled();
 
   /* Dimension : key */ 
   void setKey(K key);

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java Wed Jan 22 16:27:18 2014
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -127,6 +128,16 @@ public String[] getLocations() {
   public void setLimit(long limit) {
     baseQuery.setLimit(limit);
   }
+  
+  @Override
+  public Filter<K, T> getFilter() {
+    return baseQuery.getFilter();
+  }
+  
+  @Override
+  public void setFilter(Filter<K, T> filter) {
+    baseQuery.setFilter(filter);
+  }
 
   @Override
   public void write(DataOutput out) throws IOException {

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java Wed Jan 22 16:27:18 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
@@ -56,7 +57,8 @@ public abstract class QueryBase<K, T ext
   protected long startTime = -1;
   protected long endTime = -1;
 
-  protected String filter;
+  protected Filter<K, T> filter;
+  protected boolean localFilterEnabled=true;
 
   protected long limit = -1;
 
@@ -81,16 +83,6 @@ public abstract class QueryBase<K, T ext
     return dataStore;
   }
 
-//  @Override
-//  public void setQueryString(String queryString) {
-//    this.queryString = queryString;
-//  }
-//
-//  @Override
-//  public String getQueryString() {
-//    return queryString;
-//  }
-
   @Override
   public void setFields(String... fields) {
     this.fields = fields;
@@ -100,7 +92,27 @@ public abstract class QueryBase<K, T ext
 public String[] getFields() {
     return fields;
   }
-
+  
+  @Override
+  public Filter<K, T> getFilter() {
+    return filter;
+  }
+  
+  @Override
+  public void setFilter(Filter<K, T> filter) {
+    this.filter=filter;
+  }
+  
+  @Override
+  public boolean isLocalFilterEnabled() {
+    return localFilterEnabled;
+  }
+  
+  @Override
+  public void setLocalFilterEnabled(boolean enable) {
+    this.localFilterEnabled=enable;
+  }
+  
   @Override
   public void setKey(K key) {
     setKeyRange(key, key);
@@ -176,16 +188,6 @@ public String[] getFields() {
     return endTime;
   }
 
-//  @Override
-//  public void setFilter(String filter) {
-//    this.filter = filter;
-//  }
-//
-//  @Override
-//  public String getFilter() {
-//    return filter;
-//  }
-
   @Override
   public void setLimit(long limit) {
     this.limit = limit;
@@ -225,12 +227,20 @@ public String[] getFields() {
       startKey = IOUtils.deserialize(getConf(), in, null, dataStore.getKeyClass());
     if(!nullFields[3])
       endKey = IOUtils.deserialize(getConf(), in, null, dataStore.getKeyClass());
-    if(!nullFields[4])
-      filter = Text.readString(in);
+    if(!nullFields[4]) {
+      String filterClass = Text.readString(in);
+      try {
+        filter = (Filter<K, T>) ReflectionUtils.newInstance(ClassLoadingUtils.loadClass(filterClass), conf);
+        filter.readFields(in);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+    }
 
     startTime = WritableUtils.readVLong(in);
     endTime = WritableUtils.readVLong(in);
     limit = WritableUtils.readVLong(in);
+    localFilterEnabled = in.readBoolean(); 
   }
 
   //@Override
@@ -250,12 +260,15 @@ public String[] getFields() {
       IOUtils.serialize(getConf(), out, startKey, dataStore.getKeyClass());
     if(endKey != null)
       IOUtils.serialize(getConf(), out, endKey, dataStore.getKeyClass());
-    if(filter != null)
-      Text.writeString(out, filter);
+    if(filter != null) {
+      Text.writeString(out, filter.getClass().getCanonicalName());
+      filter.write(out);
+    }
 
     WritableUtils.writeVLong(out, getStartTime());
     WritableUtils.writeVLong(out, getEndTime());
     WritableUtils.writeVLong(out, getLimit());
+    out.writeBoolean(localFilterEnabled);
   }
 
   @SuppressWarnings({ "rawtypes" })
@@ -271,6 +284,7 @@ public String[] getFields() {
       builder.append(endKey, that.endKey);
       builder.append(filter, that.filter);
       builder.append(limit, that.limit);
+      builder.append(localFilterEnabled, that.localFilterEnabled);
       return builder.isEquals();
     }
     return false;
@@ -286,6 +300,7 @@ public String[] getFields() {
     builder.append(endKey);
     builder.append(filter);
     builder.append(limit);
+    builder.append(localFilterEnabled);
     return builder.toHashCode();
   }
 
@@ -298,6 +313,7 @@ public String[] getFields() {
     builder.append("endKey", endKey);
     builder.append("filter", filter);
     builder.append("limit", limit);
+    builder.append("localFilterEnabled", localFilterEnabled);
 
     return builder.toString();
   }

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java Wed Jan 22 16:27:18 2014
@@ -18,13 +18,14 @@
 
 package org.apache.gora.query.impl;
 
-import java.io.IOException;
-
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
 
+import java.io.IOException;
+
 /**
  * Base class for {@link Result} implementations.
  */
@@ -102,17 +103,37 @@ public abstract class ResultBase<K, T ex
   
   @Override
   public final boolean next() throws Exception, IOException {
-	  if(isLimitReached()) {
-		  return false;
-	  }
-	    
-	  clear();
+    if(isLimitReached()) {
+      return false;
+    }
+      
+    boolean ret;
+    do {
+      clear();
+      persistent = getOrCreatePersistent(persistent);
+      ret = nextInner();
+      if (ret == false) {
+        //this is the end
+        break;
+      }
+      //we keep looping until we get a row that is not filtered out
+    } while (filter(key, persistent));
+    
+    if(ret) ++offset;
+    return ret;
+  }
+  
+  protected boolean filter(K key, T persistent) {
+    if (!query.isLocalFilterEnabled()) {
+      return false;
+    }
+    
+    Filter<K, T> filter = query.getFilter();
+    if (filter == null) {
+      return false;
+    }
     
-	  persistent = getOrCreatePersistent(persistent);
-	  boolean ret = nextInner();
-	  
-	  if(ret) ++offset;
-	  return ret;
+    return filter.filter(key, persistent);
   }
   
   @Override

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java Wed Jan 22 16:27:18 2014
@@ -18,15 +18,16 @@
 
 package org.apache.gora.query.ws.impl;
 
-import java.util.Arrays;
-
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.store.DataStore;
 
+import java.util.Arrays;
+
 /**
- * Implementation for {@link PartitionQuery}.
+ * Webservices implementation for {@link PartitionQuery}.
  */
 //TODO this class should be reviewed when a web service backed datastore has the
 // ability to write partition queries
@@ -182,6 +183,27 @@ public class PartitionWSQueryImpl<K, T e
   public void setLimit(long limit) {
     baseQuery.setLimit(limit);
   }
+  
+  @Override
+  public Filter<K, T> getFilter() {
+    return filter;
+  }
+  
+  @Override
+  public void setFilter(Filter<K, T> filter) {
+    this.filter=filter;
+  }
+  
+  @Override
+  public boolean isLocalFilterEnabled() {
+    return localFilterEnabled;
+  }
+  
+  @Override
+  public void setLocalFilterEnabled(boolean enable) {
+    this.localFilterEnabled=enable;
+  }
+  
 
   @Override
   @SuppressWarnings({ "rawtypes" })

Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java Wed Jan 22 16:27:18 2014
@@ -21,6 +21,7 @@ package org.apache.gora.query.ws.impl;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
@@ -58,11 +59,8 @@ public abstract class QueryWSBase<K, T e
   protected long startTime = -1;
   protected long endTime = -1;
 
-  /**
-   * Query filter
-   */
-  protected String filter;
-
+  protected Filter<K, T> filter;
+  protected boolean localFilterEnabled=true;
   /**
    * Max number of results to be retrieved
    */

Modified: gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java (original)
+++ gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java Wed Jan 22 16:27:18 2014
@@ -21,6 +21,7 @@ package org.apache.gora.dynamodb.query;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.ws.impl.QueryWSBase;
@@ -377,4 +378,28 @@ public class DynamoDBQuery<K, T extends 
   public static void setRangeCompOp(ComparisonOperator pRangeCompOp){
     rangeCompOp = pRangeCompOp;
   }
+
+  @Override
+  public void setFilter(Filter<K, T> filter) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public Filter<K, T> getFilter() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void setLocalFilterEnabled(boolean enable) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public boolean isLocalFilterEnabled() {
+    // TODO Auto-generated method stub
+    return false;
+  }
 }

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java Wed Jan 22 16:27:18 2014
@@ -18,7 +18,6 @@
 
 package org.apache.gora.hbase.query;
 
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.impl.QueryBase;

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java Wed Jan 22 16:27:18 2014
@@ -22,7 +22,7 @@ import java.util.Arrays;
 /**
  * Store family, qualifier tuple 
  */
-class HBaseColumn {
+public class HBaseColumn {
   
   final byte[] family;
   final byte[] qualifier;

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Wed Jan 22 16:27:18 2014
@@ -45,6 +45,7 @@ import org.apache.gora.hbase.query.HBase
 import org.apache.gora.hbase.query.HBaseScannerResult;
 import org.apache.gora.hbase.store.HBaseMapping.HBaseMappingBuilder;
 import org.apache.gora.hbase.util.HBaseByteInterface;
+import org.apache.gora.hbase.util.HBaseFilterUtil;
 import org.apache.gora.persistency.ListGenericArray;
 import org.apache.gora.persistency.State;
 import org.apache.gora.persistency.StateManager;
@@ -99,6 +100,8 @@ implements Configurable {
   private final boolean autoCreateSchema = true;
 
   private volatile HBaseMapping mapping;
+  
+  private HBaseFilterUtil<K, T> filterUtil;
 
   private int scannerCaching = SCANNER_CACHING_PROPERTIES_DEFAULT ;
   
@@ -114,7 +117,7 @@ implements Configurable {
       this.conf = HBaseConfiguration.create(getConf());
       admin = new HBaseAdmin(this.conf);
       mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
-      
+      filterUtil = new HBaseFilterUtil<K, T>(this.conf);
     } catch (FileNotFoundException ex) {
       try {
         mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEPRECATED_MAPPING_FILE));
@@ -161,6 +164,10 @@ implements Configurable {
     //return the name of this table
     return mapping.getTableName();
   }
+  
+  public HBaseMapping getMapping() {
+    return mapping;
+  }
 
   @Override
   public void createSchema() {
@@ -276,6 +283,8 @@ implements Configurable {
                     hasDeletes = true;
                     delete.deleteColumn(hcol.getFamily(), qual);
                     break;
+                  default :
+                    break;
                 }
               }
             } else {
@@ -416,8 +425,7 @@ implements Configurable {
     }
     List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>(keys.getFirst().length);
     for (int i = 0; i < keys.getFirst().length; i++) {
-      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
-      getServerAddress().getHostname();
+      String regionLocation = table.getRegionLocation(keys.getFirst()[i]).getServerAddress().getHostname();
       byte[] startRow = query.getStartKey() != null ? toBytes(query.getStartKey())
           : HConstants.EMPTY_START_ROW;
       byte[] stopRow = query.getEndKey() != null ? toBytes(query.getEndKey())
@@ -469,7 +477,7 @@ implements Configurable {
         ResultScanner scanner = createScanner(query);
   
         org.apache.gora.query.Result<K,T> result
-            = new HBaseScannerResult<K,T>(this,query, scanner);
+            = new HBaseScannerResult<K,T>(this, query, scanner);
   
         return result;
       }
@@ -492,6 +500,13 @@ implements Configurable {
       scan.setStopRow(toBytes(query.getEndKey()));
     }
     addFields(scan, query);
+    if (query.getFilter() != null) {
+      boolean succeeded = filterUtil.setFilter(scan, query.getFilter(), this);
+      if (succeeded) {
+        // don't need local filter
+        query.setLocalFilterEnabled(false);
+      }
+    }
 
     return table.getScanner(scan);
   }

Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Wed Jan 22 16:27:18 2014
@@ -237,6 +237,8 @@ public class HBaseByteInterface {
       return Bytes.toBytes((String) o);
     } else if (clazz.equals(Utf8.class)) {
       return ((Utf8) o).getBytes();
+    } else if (clazz.isArray() && clazz.getComponentType().equals(Byte.TYPE)) {
+      return (byte[])o;
     }
     throw new RuntimeException("Can't parse data as class: " + clazz);
   }