You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/01/22 17:27:19 UTC
svn commit: r1560407 - in /gora/trunk: ./
gora-core/src/main/java/org/apache/gora/query/
gora-core/src/main/java/org/apache/gora/query/impl/
gora-core/src/main/java/org/apache/gora/query/ws/impl/
gora-dynamodb/src/main/java/org/apache/gora/dynamodb/que...
Author: lewismc
Date: Wed Jan 22 16:27:18 2014
New Revision: 1560407
URL: http://svn.apache.org/r1560407
Log:
GORA-119
Modified:
gora/trunk/CHANGES.txt
gora/trunk/gora-core/src/main/java/org/apache/gora/query/Query.java
gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java
gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java
gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java
gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java
gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java
gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java
gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
Modified: gora/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/trunk/CHANGES.txt?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/CHANGES.txt (original)
+++ gora/trunk/CHANGES.txt Wed Jan 22 16:27:18 2014
@@ -4,6 +4,8 @@
Gora Change Log
+* GORA-119 implement a filter enabled scan in gora (ferdy, kturner, enis, Tien Nguyen Manh via lewismc)
+
* GORA-290 StatefulHashMap removes the entry when put with same value (Alparslan Avci via hsaputra)
* GORA-231 Provide better error handling in AccumuloStore.readMapping when file does not exist (Apostolos Giannakidis)
Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/Query.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/Query.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/Query.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/Query.java Wed Jan 22 16:27:18 2014
@@ -18,8 +18,7 @@
package org.apache.gora.query;
-import java.io.IOException;
-
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.store.DataStore;
@@ -71,6 +70,30 @@ public interface Query<K, T extends Pers
void setFields(String... fieldNames);
String[] getFields();
+
+ /**
+ * @param Set a filter on this query.
+ */
+ public void setFilter(Filter<K, T> filter);
+
+ /**
+ * @return The filter on this query, or <code>null</code> if none.
+ */
+ public Filter<K, T> getFilter();
+
+ /**
+ * Set whether the local filter is enabled. This is usually called by
+ * data store implementations that install the filter remotely
+ * (for efficiency reasons) and therefore disable the local filter.
+ * @param enable
+ */
+ void setLocalFilterEnabled(boolean enable);
+
+ /**
+ * @return Whether the local filter is enabled.
+ * See {@link #setLocalFilterEnabled(boolean)}.
+ */
+ boolean isLocalFilterEnabled();
/* Dimension : key */
void setKey(K key);
Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java Wed Jan 22 16:27:18 2014
@@ -23,6 +23,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
@@ -127,6 +128,16 @@ public String[] getLocations() {
public void setLimit(long limit) {
baseQuery.setLimit(limit);
}
+
+ @Override
+ public Filter<K, T> getFilter() {
+ return baseQuery.getFilter();
+ }
+
+ @Override
+ public void setFilter(Filter<K, T> filter) {
+ baseQuery.setFilter(filter);
+ }
@Override
public void write(DataOutput out) throws IOException {
Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java Wed Jan 22 16:27:18 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
@@ -56,7 +57,8 @@ public abstract class QueryBase<K, T ext
protected long startTime = -1;
protected long endTime = -1;
- protected String filter;
+ protected Filter<K, T> filter;
+ protected boolean localFilterEnabled=true;
protected long limit = -1;
@@ -81,16 +83,6 @@ public abstract class QueryBase<K, T ext
return dataStore;
}
-// @Override
-// public void setQueryString(String queryString) {
-// this.queryString = queryString;
-// }
-//
-// @Override
-// public String getQueryString() {
-// return queryString;
-// }
-
@Override
public void setFields(String... fields) {
this.fields = fields;
@@ -100,7 +92,27 @@ public abstract class QueryBase<K, T ext
public String[] getFields() {
return fields;
}
-
+
+ @Override
+ public Filter<K, T> getFilter() {
+ return filter;
+ }
+
+ @Override
+ public void setFilter(Filter<K, T> filter) {
+ this.filter=filter;
+ }
+
+ @Override
+ public boolean isLocalFilterEnabled() {
+ return localFilterEnabled;
+ }
+
+ @Override
+ public void setLocalFilterEnabled(boolean enable) {
+ this.localFilterEnabled=enable;
+ }
+
@Override
public void setKey(K key) {
setKeyRange(key, key);
@@ -176,16 +188,6 @@ public String[] getFields() {
return endTime;
}
-// @Override
-// public void setFilter(String filter) {
-// this.filter = filter;
-// }
-//
-// @Override
-// public String getFilter() {
-// return filter;
-// }
-
@Override
public void setLimit(long limit) {
this.limit = limit;
@@ -225,12 +227,20 @@ public String[] getFields() {
startKey = IOUtils.deserialize(getConf(), in, null, dataStore.getKeyClass());
if(!nullFields[3])
endKey = IOUtils.deserialize(getConf(), in, null, dataStore.getKeyClass());
- if(!nullFields[4])
- filter = Text.readString(in);
+ if(!nullFields[4]) {
+ String filterClass = Text.readString(in);
+ try {
+ filter = (Filter<K, T>) ReflectionUtils.newInstance(ClassLoadingUtils.loadClass(filterClass), conf);
+ filter.readFields(in);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
startTime = WritableUtils.readVLong(in);
endTime = WritableUtils.readVLong(in);
limit = WritableUtils.readVLong(in);
+ localFilterEnabled = in.readBoolean();
}
//@Override
@@ -250,12 +260,15 @@ public String[] getFields() {
IOUtils.serialize(getConf(), out, startKey, dataStore.getKeyClass());
if(endKey != null)
IOUtils.serialize(getConf(), out, endKey, dataStore.getKeyClass());
- if(filter != null)
- Text.writeString(out, filter);
+ if(filter != null) {
+ Text.writeString(out, filter.getClass().getCanonicalName());
+ filter.write(out);
+ }
WritableUtils.writeVLong(out, getStartTime());
WritableUtils.writeVLong(out, getEndTime());
WritableUtils.writeVLong(out, getLimit());
+ out.writeBoolean(localFilterEnabled);
}
@SuppressWarnings({ "rawtypes" })
@@ -271,6 +284,7 @@ public String[] getFields() {
builder.append(endKey, that.endKey);
builder.append(filter, that.filter);
builder.append(limit, that.limit);
+ builder.append(localFilterEnabled, that.localFilterEnabled);
return builder.isEquals();
}
return false;
@@ -286,6 +300,7 @@ public String[] getFields() {
builder.append(endKey);
builder.append(filter);
builder.append(limit);
+ builder.append(localFilterEnabled);
return builder.toHashCode();
}
@@ -298,6 +313,7 @@ public String[] getFields() {
builder.append("endKey", endKey);
builder.append("filter", filter);
builder.append("limit", limit);
+ builder.append("localFilterEnabled", localFilterEnabled);
return builder.toString();
}
Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java Wed Jan 22 16:27:18 2014
@@ -18,13 +18,14 @@
package org.apache.gora.query.impl;
-import java.io.IOException;
-
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.store.DataStore;
+import java.io.IOException;
+
/**
* Base class for {@link Result} implementations.
*/
@@ -102,17 +103,37 @@ public abstract class ResultBase<K, T ex
@Override
public final boolean next() throws Exception, IOException {
- if(isLimitReached()) {
- return false;
- }
-
- clear();
+ if(isLimitReached()) {
+ return false;
+ }
+
+ boolean ret;
+ do {
+ clear();
+ persistent = getOrCreatePersistent(persistent);
+ ret = nextInner();
+ if (ret == false) {
+ //this is the end
+ break;
+ }
+ //we keep looping until we get a row that is not filtered out
+ } while (filter(key, persistent));
+
+ if(ret) ++offset;
+ return ret;
+ }
+
+ protected boolean filter(K key, T persistent) {
+ if (!query.isLocalFilterEnabled()) {
+ return false;
+ }
+
+ Filter<K, T> filter = query.getFilter();
+ if (filter == null) {
+ return false;
+ }
- persistent = getOrCreatePersistent(persistent);
- boolean ret = nextInner();
-
- if(ret) ++offset;
- return ret;
+ return filter.filter(key, persistent);
}
@Override
Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java Wed Jan 22 16:27:18 2014
@@ -18,15 +18,16 @@
package org.apache.gora.query.ws.impl;
-import java.util.Arrays;
-
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.store.DataStore;
+import java.util.Arrays;
+
/**
- * Implementation for {@link PartitionQuery}.
+ * Webservices implementation for {@link PartitionQuery}.
*/
//TODO this class should be reviewed when a web service backed datastore has the
// ability to write partition queries
@@ -182,6 +183,27 @@ public class PartitionWSQueryImpl<K, T e
public void setLimit(long limit) {
baseQuery.setLimit(limit);
}
+
+ @Override
+ public Filter<K, T> getFilter() {
+ return filter;
+ }
+
+ @Override
+ public void setFilter(Filter<K, T> filter) {
+ this.filter=filter;
+ }
+
+ @Override
+ public boolean isLocalFilterEnabled() {
+ return localFilterEnabled;
+ }
+
+ @Override
+ public void setLocalFilterEnabled(boolean enable) {
+ this.localFilterEnabled=enable;
+ }
+
@Override
@SuppressWarnings({ "rawtypes" })
Modified: gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java (original)
+++ gora/trunk/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java Wed Jan 22 16:27:18 2014
@@ -21,6 +21,7 @@ package org.apache.gora.query.ws.impl;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
@@ -58,11 +59,8 @@ public abstract class QueryWSBase<K, T e
protected long startTime = -1;
protected long endTime = -1;
- /**
- * Query filter
- */
- protected String filter;
-
+ protected Filter<K, T> filter;
+ protected boolean localFilterEnabled=true;
/**
* Max number of results to be retrieved
*/
Modified: gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java (original)
+++ gora/trunk/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java Wed Jan 22 16:27:18 2014
@@ -21,6 +21,7 @@ package org.apache.gora.dynamodb.query;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.query.ws.impl.QueryWSBase;
@@ -377,4 +378,28 @@ public class DynamoDBQuery<K, T extends
public static void setRangeCompOp(ComparisonOperator pRangeCompOp){
rangeCompOp = pRangeCompOp;
}
+
+ @Override
+ public void setFilter(Filter<K, T> filter) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Filter<K, T> getFilter() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setLocalFilterEnabled(boolean enable) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean isLocalFilterEnabled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
}
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java Wed Jan 22 16:27:18 2014
@@ -18,7 +18,6 @@
package org.apache.gora.hbase.query;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.impl.QueryBase;
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java Wed Jan 22 16:27:18 2014
@@ -22,7 +22,7 @@ import java.util.Arrays;
/**
* Store family, qualifier tuple
*/
-class HBaseColumn {
+public class HBaseColumn {
final byte[] family;
final byte[] qualifier;
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Wed Jan 22 16:27:18 2014
@@ -45,6 +45,7 @@ import org.apache.gora.hbase.query.HBase
import org.apache.gora.hbase.query.HBaseScannerResult;
import org.apache.gora.hbase.store.HBaseMapping.HBaseMappingBuilder;
import org.apache.gora.hbase.util.HBaseByteInterface;
+import org.apache.gora.hbase.util.HBaseFilterUtil;
import org.apache.gora.persistency.ListGenericArray;
import org.apache.gora.persistency.State;
import org.apache.gora.persistency.StateManager;
@@ -99,6 +100,8 @@ implements Configurable {
private final boolean autoCreateSchema = true;
private volatile HBaseMapping mapping;
+
+ private HBaseFilterUtil<K, T> filterUtil;
private int scannerCaching = SCANNER_CACHING_PROPERTIES_DEFAULT ;
@@ -114,7 +117,7 @@ implements Configurable {
this.conf = HBaseConfiguration.create(getConf());
admin = new HBaseAdmin(this.conf);
mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
-
+ filterUtil = new HBaseFilterUtil<K, T>(this.conf);
} catch (FileNotFoundException ex) {
try {
mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEPRECATED_MAPPING_FILE));
@@ -161,6 +164,10 @@ implements Configurable {
//return the name of this table
return mapping.getTableName();
}
+
+ public HBaseMapping getMapping() {
+ return mapping;
+ }
@Override
public void createSchema() {
@@ -276,6 +283,8 @@ implements Configurable {
hasDeletes = true;
delete.deleteColumn(hcol.getFamily(), qual);
break;
+ default :
+ break;
}
}
} else {
@@ -416,8 +425,7 @@ implements Configurable {
}
List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
- String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
- getServerAddress().getHostname();
+ String regionLocation = table.getRegionLocation(keys.getFirst()[i]).getServerAddress().getHostname();
byte[] startRow = query.getStartKey() != null ? toBytes(query.getStartKey())
: HConstants.EMPTY_START_ROW;
byte[] stopRow = query.getEndKey() != null ? toBytes(query.getEndKey())
@@ -469,7 +477,7 @@ implements Configurable {
ResultScanner scanner = createScanner(query);
org.apache.gora.query.Result<K,T> result
- = new HBaseScannerResult<K,T>(this,query, scanner);
+ = new HBaseScannerResult<K,T>(this, query, scanner);
return result;
}
@@ -492,6 +500,13 @@ implements Configurable {
scan.setStopRow(toBytes(query.getEndKey()));
}
addFields(scan, query);
+ if (query.getFilter() != null) {
+ boolean succeeded = filterUtil.setFilter(scan, query.getFilter(), this);
+ if (succeeded) {
+ // don't need local filter
+ query.setLocalFilterEnabled(false);
+ }
+ }
return table.getScanner(scan);
}
Modified: gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1560407&r1=1560406&r2=1560407&view=diff
==============================================================================
--- gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/trunk/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Wed Jan 22 16:27:18 2014
@@ -237,6 +237,8 @@ public class HBaseByteInterface {
return Bytes.toBytes((String) o);
} else if (clazz.equals(Utf8.class)) {
return ((Utf8) o).getBytes();
+ } else if (clazz.isArray() && clazz.getComponentType().equals(Byte.TYPE)) {
+ return (byte[])o;
}
throw new RuntimeException("Can't parse data as class: " + clazz);
}