You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2014/01/29 23:02:59 UTC
svn commit: r1562607 - in /gora/branches/GORA_94: ./
gora-core/src/main/java/org/apache/gora/filter/
gora-core/src/main/java/org/apache/gora/query/
gora-core/src/main/java/org/apache/gora/query/impl/
gora-core/src/main/java/org/apache/gora/query/ws/imp...
Author: lewismc
Date: Wed Jan 29 22:02:59 2014
New Revision: 1562607
URL: http://svn.apache.org/r1562607
Log:
GORA-119 implement a filter enabled scan in gora
Added:
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/Filter.java
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterList.java
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java
gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java
Modified:
gora/branches/GORA_94/CHANGES.txt
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/Query.java
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java
gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java
gora/branches/GORA_94/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
Modified: gora/branches/GORA_94/CHANGES.txt
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/CHANGES.txt?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/CHANGES.txt (original)
+++ gora/branches/GORA_94/CHANGES.txt Wed Jan 29 22:02:59 2014
@@ -4,6 +4,8 @@
Gora Change Log
+* GORA-119 implement a filter enabled scan in gora (ferdy, kturner, enis, Tien Nguyen Manh via lewismc)
+
* GORA-231 Provide better error handling in AccumuloStore.readMapping when file does not exist. (apgiannakidis via lewismc)
* GORA-283 Specify field name for types not being considered in gora-cassandra (lewismc)
Added: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/Filter.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/Filter.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/Filter.java (added)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/Filter.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Defines filtering (possibly including modification) of rows. By default
+ * all filtering is done client side. (In generic Gora classes). Datastore
+ * implementations can decide if they install remote filters, when possible.
+ *
+ * @param <K>
+ * @param <T>
+ */
+public interface Filter<K, T extends Persistent> extends Writable{
+
+ /**
+ * Filter the key and persistent. Modification is possible.
+ *
+ * @param key
+ * @param persistent
+ * @return <code>true</code> if the row is filtered out (excluded),
+ * <code>false</code> otherwise.
+ */
+ public boolean filter(K key, T persistent);
+}
Added: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterList.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterList.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterList.java (added)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterList.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.filter;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.ReflectionUtils;
+import org.apache.hadoop.io.Text;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FilterList<K, T extends PersistentBase> implements Filter<K, T> {
+ /** set operator */
+ public static enum Operator {
+ /** !AND */
+ MUST_PASS_ALL,
+ /** !OR */
+ MUST_PASS_ONE
+ }
+
+ private Operator operator = Operator.MUST_PASS_ALL;
+ private List<Filter<K, T>> filters = new ArrayList<Filter<K, T>>();
+
+ public FilterList() {
+ }
+
+ public FilterList(final List<Filter<K, T>> rowFilters) {
+ this.filters = rowFilters;
+ }
+
+ public FilterList(final Operator operator) {
+ this.operator = operator;
+ }
+
+ public FilterList(final Operator operator, final List<Filter<K, T>> rowFilters) {
+ this.filters = rowFilters;
+ this.operator = operator;
+ }
+
+ public List<Filter<K, T>> getFilters() {
+ return filters;
+ }
+
+ public Operator getOperator() {
+ return operator;
+ }
+
+ public void addFilter(Filter<K, T> filter) {
+ this.filters.add(filter);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ byte opByte = in.readByte();
+ operator = Operator.values()[opByte];
+ int size = in.readInt();
+ if (size > 0) {
+ filters = new ArrayList<Filter<K, T>>(size);
+ try {
+ for (int i = 0; i < size; i++) {
+ @SuppressWarnings("unchecked")
+ Class<? extends Filter<K, T>> cls = (Class<? extends Filter<K, T>>) Class.forName(Text.readString(in)).asSubclass(Filter.class);
+ Filter<K, T> filter = ReflectionUtils.newInstance(cls);
+ filter.readFields(in);
+ filters.add(filter);
+ }
+ } catch (Exception e) {
+ throw (IOException)new IOException("Failed filter init").initCause(e);
+ }
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(operator.ordinal());
+ out.writeInt(filters.size());
+ for (Filter<K, T> filter : filters) {
+ Text.writeString(out, filter.getClass().getName());
+ filter.write(out);
+ }
+ }
+
+ @Override
+ public boolean filter(K key, T persistent) {
+ // TODO not yet implemented
+ return false;
+ }
+
+}
Added: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java (added)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/FilterOp.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+/**
+ * Defines a set of common filter compare operations.
+ */
+public enum FilterOp {
+ EQUALS,
+ NOT_EQUALS,
+ LESS,
+ LESS_OR_EQUAL,
+ GREATER,
+ GREATER_OR_EQUAL,
+}
Added: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java (added)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/MapFieldValueFilter.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A filter that checks for a single field in the persistent.
+ *
+ * @param <K>
+ * @param <T>
+ */
+public class MapFieldValueFilter<K, T extends PersistentBase> implements Filter<K, T> {
+
+ protected String fieldName;
+ protected Utf8 mapKey;
+ protected FilterOp filterOp;
+ protected List<Object> operands = new ArrayList<Object>();
+ protected boolean filterIfMissing = false;
+
+ private Configuration conf = new Configuration(); // just create empty conf,
+ // needed for ObjectWritable
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, fieldName);
+ Text.writeString(out, mapKey.toString());
+ WritableUtils.writeEnum(out, filterOp);
+ WritableUtils.writeVInt(out, operands.size());
+ for (int i = 0; i < operands.size(); i++) {
+ Object operand = operands.get(i);
+ if (operand instanceof String) {
+ throw new IllegalStateException("Use Utf8 instead of String for operands");
+ }
+ if (operand instanceof Utf8) {
+ operand = operand.toString();
+ }
+ if (operand instanceof Boolean) {
+ ObjectWritable.writeObject(out, operand, Boolean.TYPE, conf);
+ } else if (operand instanceof Character) {
+ ObjectWritable.writeObject(out, operand, Character.TYPE, conf);
+ } else if (operand instanceof Byte) {
+ ObjectWritable.writeObject(out, operand, Byte.TYPE, conf);
+ } else if (operand instanceof Short) {
+ ObjectWritable.writeObject(out, operand, Short.TYPE, conf);
+ } else if (operand instanceof Integer) {
+ ObjectWritable.writeObject(out, operand, Integer.TYPE, conf);
+ } else if (operand instanceof Long) {
+ ObjectWritable.writeObject(out, operand, Long.TYPE, conf);
+ } else if (operand instanceof Float) {
+ ObjectWritable.writeObject(out, operand, Float.TYPE, conf);
+ } else if (operand instanceof Double) {
+ ObjectWritable.writeObject(out, operand, Double.TYPE, conf);
+ } else if (operand instanceof Void) {
+ ObjectWritable.writeObject(out, operand, Void.TYPE, conf);
+ } else {
+ ObjectWritable.writeObject(out, operand, operand.getClass(), conf);
+ }
+ }
+ out.writeBoolean(filterIfMissing);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ fieldName = Text.readString(in);
+ mapKey = new Utf8(Text.readString(in));
+ filterOp = WritableUtils.readEnum(in, FilterOp.class);
+ operands.clear();
+ int operandsSize = WritableUtils.readVInt(in);
+ for (int i = 0; i < operandsSize; i++) {
+ Object operand = ObjectWritable.readObject(in, conf);
+ if (operand instanceof String) {
+ operand = new Utf8((String) operand);
+ }
+ operands.add(operand);
+ }
+ filterIfMissing = in.readBoolean();
+ }
+
+ @Override
+ public boolean filter(K key, T persistent) {
+ int fieldIndex = persistent.getSchema().getField(fieldName).pos();
+ @SuppressWarnings("unchecked")
+ Map<Utf8, ?> fieldValue = (Map<Utf8, ?>) persistent.get(fieldIndex);
+ if (fieldValue == null) {
+ return filterIfMissing;
+ }
+ Object value = fieldValue.get(mapKey);
+ Object operand = operands.get(0);
+ if (value == null) {
+ return filterIfMissing;
+ }
+ if (filterOp.equals(FilterOp.EQUALS)) {
+ boolean equals = value.equals(operand);
+ return !equals;
+ } else if (filterOp.equals(FilterOp.NOT_EQUALS)) {
+ boolean equals = value.equals(operand);
+ return equals;
+ } else {
+ throw new IllegalStateException(filterOp + " not yet implemented");
+ }
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public void setFieldName(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ public Utf8 getMapKey() {
+ return mapKey;
+ }
+
+ public void setMapKey(Utf8 mapKey) {
+ this.mapKey = mapKey;
+ }
+
+ public FilterOp getFilterOp() {
+ return filterOp;
+ }
+
+ public void setFilterOp(FilterOp filterOp) {
+ this.filterOp = filterOp;
+ }
+
+ public List<Object> getOperands() {
+ return operands;
+ }
+
+ public void setOperands(List<Object> operands) {
+ this.operands = operands;
+ }
+
+ public void setFilterIfMissing(boolean filterIfMissing) {
+ this.filterIfMissing = filterIfMissing;
+ }
+
+ public boolean isFilterIfMissing() {
+ return filterIfMissing;
+ }
+
+ @Override
+ public String toString() {
+ return "SingleFieldValueFilter [fieldName=" + fieldName + ",mapKey=" + mapKey + ", filterOp=" + filterOp + ", operands=" + operands
+ + ", filterIfMissing=" + filterIfMissing + "]";
+ }
+}
Added: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java (added)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/filter/SingleFieldValueFilter.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A filter that checks for a single field in the persistent.
+ *
+ * @param <K>
+ * @param <T>
+ */
+public class SingleFieldValueFilter<K, T extends PersistentBase> implements Filter<K, T>{
+
+ protected String fieldName;
+ protected FilterOp filterOp;
+ protected List<Object> operands = new ArrayList<Object>();
+ protected boolean filterIfMissing = false;
+
+ private Configuration conf = new Configuration(); //just create empty conf,
+ //needed for ObjectWritable
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, fieldName);
+ WritableUtils.writeEnum(out, filterOp);
+ WritableUtils.writeVInt(out, operands.size());
+ for (int i = 0; i < operands.size(); i++) {
+ Object operand = operands.get(i);
+ if (operand instanceof String) {
+ throw new IllegalStateException("Use Utf8 instead of String for operands");
+ }
+ if (operand instanceof Utf8) {
+ operand=operand.toString();
+ }
+ if (operand instanceof Boolean) {
+ ObjectWritable.writeObject(out, operand, Boolean.TYPE, conf);
+ } else if (operand instanceof Character) {
+ ObjectWritable.writeObject(out, operand, Character.TYPE, conf);
+ } else if (operand instanceof Byte) {
+ ObjectWritable.writeObject(out, operand, Byte.TYPE, conf);
+ } else if (operand instanceof Short) {
+ ObjectWritable.writeObject(out, operand, Short.TYPE, conf);
+ } else if (operand instanceof Integer) {
+ ObjectWritable.writeObject(out, operand, Integer.TYPE, conf);
+ } else if (operand instanceof Long) {
+ ObjectWritable.writeObject(out, operand, Long.TYPE, conf);
+ } else if (operand instanceof Float) {
+ ObjectWritable.writeObject(out, operand, Float.TYPE, conf);
+ } else if (operand instanceof Double) {
+ ObjectWritable.writeObject(out, operand, Double.TYPE, conf);
+ } else if (operand instanceof Void) {
+ ObjectWritable.writeObject(out, operand, Void.TYPE, conf);
+ } else {
+ ObjectWritable.writeObject(out, operand, operand.getClass(), conf);
+ }
+ }
+ out.writeBoolean(filterIfMissing);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ fieldName = Text.readString(in);
+ filterOp = WritableUtils.readEnum(in, FilterOp.class);
+ operands.clear();
+ int operandsSize = WritableUtils.readVInt(in);
+ for (int i = 0; i < operandsSize; i++) {
+ Object operand = ObjectWritable.readObject(in, conf);
+ if (operand instanceof String) {
+ operand=new Utf8((String) operand);
+ }
+ operands.add(operand);
+ }
+ filterIfMissing = in.readBoolean();
+ }
+
+ @Override
+ public boolean filter(K key, T persistent) {
+ int fieldIndex = persistent.getSchema().getField(fieldName).pos(); //.getIndexNamed(fieldName); throws org.apache.avro.AvroRuntimeException: Not a union:
+ Object fieldValue = persistent.get(fieldIndex);
+ Object operand = operands.get(0);
+ if (fieldValue == null) {
+ return filterIfMissing;
+ }
+ if (filterOp.equals(FilterOp.EQUALS)) {
+ boolean equals = operand.equals(fieldValue);
+ return !equals;
+ } else if (filterOp.equals(FilterOp.NOT_EQUALS)) {
+ boolean equals = operand.equals(fieldValue);
+ return equals;
+ // TODO implement other FilterOp operators
+ } else {
+ throw new IllegalStateException(filterOp + " not yet implemented");
+ }
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+ public void setFieldName(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ public FilterOp getFilterOp() {
+ return filterOp;
+ }
+ public void setFilterOp(FilterOp filterOp) {
+ this.filterOp = filterOp;
+ }
+
+ public List<Object> getOperands() {
+ return operands;
+ }
+ public void setOperands(List<Object> operands) {
+ this.operands = operands;
+ }
+
+ public void setFilterIfMissing(boolean filterIfMissing) {
+ this.filterIfMissing = filterIfMissing;
+ }
+
+ public boolean isFilterIfMissing() {
+ return filterIfMissing;
+ }
+
+ @Override
+ public String toString() {
+ return "SingleFieldValueFilter [fieldName=" + fieldName + ", filterOp="
+ + filterOp + ", operands=" + operands + ", filterIfMissing="
+ + filterIfMissing + "]";
+ }
+}
Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/Query.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/Query.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/Query.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/Query.java Wed Jan 29 22:02:59 2014
@@ -18,8 +18,7 @@
package org.apache.gora.query;
-import java.io.IOException;
-
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.store.DataStore;
@@ -71,6 +70,30 @@ public interface Query<K, T extends Pers
void setFields(String... fieldNames);
String[] getFields();
+
+ /**
+ * @param Set a filter on this query.
+ */
+ public void setFilter(Filter<K, T> filter);
+
+ /**
+ * @return The filter on this query, or <code>null</code> if none.
+ */
+ public Filter<K, T> getFilter();
+
+ /**
+ * Set whether the local filter is enabled. This is usually called by
+ * data store implementations that install the filter remotely
+ * (for efficiency reasons) and therefore disable the local filter.
+ * @param enable
+ */
+ void setLocalFilterEnabled(boolean enable);
+
+ /**
+ * @return Whether the local filter is enabled.
+ * See {@link #setLocalFilterEnabled(boolean)}.
+ */
+ boolean isLocalFilterEnabled();
/* Dimension : key */
void setKey(K key);
Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/PartitionQueryImpl.java Wed Jan 29 22:02:59 2014
@@ -23,6 +23,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
@@ -127,6 +128,16 @@ public String[] getLocations() {
public void setLimit(long limit) {
baseQuery.setLimit(limit);
}
+
+ @Override
+ public Filter<K, T> getFilter() {
+ return baseQuery.getFilter();
+ }
+
+ @Override
+ public void setFilter(Filter<K, T> filter) {
+ baseQuery.setFilter(filter);
+ }
@Override
public void write(DataOutput out) throws IOException {
Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/QueryBase.java Wed Jan 29 22:02:59 2014
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
@@ -56,7 +57,8 @@ public abstract class QueryBase<K, T ext
protected long startTime = -1;
protected long endTime = -1;
- protected String filter;
+ protected Filter<K, T> filter;
+ protected boolean localFilterEnabled=true;
protected long limit = -1;
@@ -81,16 +83,6 @@ public abstract class QueryBase<K, T ext
return dataStore;
}
-// @Override
-// public void setQueryString(String queryString) {
-// this.queryString = queryString;
-// }
-//
-// @Override
-// public String getQueryString() {
-// return queryString;
-// }
-
@Override
public void setFields(String... fields) {
this.fields = fields;
@@ -100,7 +92,27 @@ public abstract class QueryBase<K, T ext
public String[] getFields() {
return fields;
}
-
+
+ @Override
+ public Filter<K, T> getFilter() {
+ return filter;
+ }
+
+ @Override
+ public void setFilter(Filter<K, T> filter) {
+ this.filter=filter;
+ }
+
+ @Override
+ public boolean isLocalFilterEnabled() {
+ return localFilterEnabled;
+ }
+
+ @Override
+ public void setLocalFilterEnabled(boolean enable) {
+ this.localFilterEnabled=enable;
+ }
+
@Override
public void setKey(K key) {
setKeyRange(key, key);
@@ -176,16 +188,6 @@ public String[] getFields() {
return endTime;
}
-// @Override
-// public void setFilter(String filter) {
-// this.filter = filter;
-// }
-//
-// @Override
-// public String getFilter() {
-// return filter;
-// }
-
@Override
public void setLimit(long limit) {
this.limit = limit;
@@ -225,12 +227,20 @@ public String[] getFields() {
startKey = IOUtils.deserialize(getConf(), in, null, dataStore.getKeyClass());
if(!nullFields[3])
endKey = IOUtils.deserialize(getConf(), in, null, dataStore.getKeyClass());
- if(!nullFields[4])
- filter = Text.readString(in);
+ if(!nullFields[4]) {
+ String filterClass = Text.readString(in);
+ try {
+ filter = (Filter<K, T>) ReflectionUtils.newInstance(ClassLoadingUtils.loadClass(filterClass), conf);
+ filter.readFields(in);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
startTime = WritableUtils.readVLong(in);
endTime = WritableUtils.readVLong(in);
limit = WritableUtils.readVLong(in);
+ localFilterEnabled = in.readBoolean();
}
//@Override
@@ -250,12 +260,15 @@ public String[] getFields() {
IOUtils.serialize(getConf(), out, startKey, dataStore.getKeyClass());
if(endKey != null)
IOUtils.serialize(getConf(), out, endKey, dataStore.getKeyClass());
- if(filter != null)
- Text.writeString(out, filter);
+ if(filter != null) {
+ Text.writeString(out, filter.getClass().getCanonicalName());
+ filter.write(out);
+ }
WritableUtils.writeVLong(out, getStartTime());
WritableUtils.writeVLong(out, getEndTime());
WritableUtils.writeVLong(out, getLimit());
+ out.writeBoolean(localFilterEnabled);
}
@SuppressWarnings({ "rawtypes" })
@@ -271,6 +284,7 @@ public String[] getFields() {
builder.append(endKey, that.endKey);
builder.append(filter, that.filter);
builder.append(limit, that.limit);
+ builder.append(localFilterEnabled, that.localFilterEnabled);
return builder.isEquals();
}
return false;
@@ -286,6 +300,7 @@ public String[] getFields() {
builder.append(endKey);
builder.append(filter);
builder.append(limit);
+ builder.append(localFilterEnabled);
return builder.toHashCode();
}
@@ -298,6 +313,7 @@ public String[] getFields() {
builder.append("endKey", endKey);
builder.append("filter", filter);
builder.append("limit", limit);
+ builder.append("localFilterEnabled", localFilterEnabled);
return builder.toString();
}
Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/impl/ResultBase.java Wed Jan 29 22:02:59 2014
@@ -18,13 +18,14 @@
package org.apache.gora.query.impl;
-import java.io.IOException;
-
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.store.DataStore;
+import java.io.IOException;
+
/**
* Base class for {@link Result} implementations.
*/
@@ -102,17 +103,37 @@ public abstract class ResultBase<K, T ex
@Override
public final boolean next() throws Exception, IOException {
- if(isLimitReached()) {
- return false;
- }
-
- clear();
+ if(isLimitReached()) {
+ return false;
+ }
+
+ boolean ret;
+ do {
+ clear();
+ persistent = getOrCreatePersistent(persistent);
+ ret = nextInner();
+ if (ret == false) {
+ //this is the end
+ break;
+ }
+ //we keep looping until we get a row that is not filtered out
+ } while (filter(key, persistent));
+
+ if(ret) ++offset;
+ return ret;
+ }
+
+ protected boolean filter(K key, T persistent) {
+ if (!query.isLocalFilterEnabled()) {
+ return false;
+ }
+
+ Filter<K, T> filter = query.getFilter();
+ if (filter == null) {
+ return false;
+ }
- persistent = getOrCreatePersistent(persistent);
- boolean ret = nextInner();
-
- if(ret) ++offset;
- return ret;
+ return filter.filter(key, persistent);
}
@Override
Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/PartitionWSQueryImpl.java Wed Jan 29 22:02:59 2014
@@ -18,15 +18,16 @@
package org.apache.gora.query.ws.impl;
-import java.util.Arrays;
-
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.store.DataStore;
+import java.util.Arrays;
+
/**
- * Implementation for {@link PartitionQuery}.
+ * Webservices implementation for {@link PartitionQuery}.
*/
//TODO this class should be reviewed when a web service backed datastore has the
// ability to write partition queries
@@ -182,6 +183,27 @@ public class PartitionWSQueryImpl<K, T e
public void setLimit(long limit) {
baseQuery.setLimit(limit);
}
+
+ @Override
+ public Filter<K, T> getFilter() {
+ return filter;
+ }
+
+ @Override
+ public void setFilter(Filter<K, T> filter) {
+ this.filter=filter;
+ }
+
+ @Override
+ public boolean isLocalFilterEnabled() {
+ return localFilterEnabled;
+ }
+
+ @Override
+ public void setLocalFilterEnabled(boolean enable) {
+ this.localFilterEnabled=enable;
+ }
+
@Override
@SuppressWarnings({ "rawtypes" })
Modified: gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java (original)
+++ gora/branches/GORA_94/gora-core/src/main/java/org/apache/gora/query/ws/impl/QueryWSBase.java Wed Jan 29 22:02:59 2014
@@ -21,6 +21,7 @@ package org.apache.gora.query.ws.impl;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
@@ -58,11 +59,8 @@ public abstract class QueryWSBase<K, T e
protected long startTime = -1;
protected long endTime = -1;
- /**
- * Query filter
- */
- protected String filter;
-
+ protected Filter<K, T> filter;
+ protected boolean localFilterEnabled=true;
/**
* Max number of results to be retrieved
*/
Added: gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java (added)
+++ gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestMapFieldValueFilter.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import static org.junit.Assert.*;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.hadoop.io.WritableUtils;
+import org.junit.Test;
+
+public class TestMapFieldValueFilter {
+
+ @Test
+ public void testSerialization() throws IOException {
+ MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String, WebPage>();
+ //set filter field name as metadata
+ filter.setFieldName(WebPage.Field.METADATA.toString());
+ filter.setMapKey(new Utf8("fetchTime"));
+ filter.setFilterOp(FilterOp.EQUALS);
+ filter.setFilterIfMissing(true);
+ filter.getOperands().add(new Utf8("http://example.org"));
+ byte[] byteArray = WritableUtils.toByteArray(filter);
+ MapFieldValueFilter<String, WebPage> filter2 = new MapFieldValueFilter<String, WebPage>();
+ filter2.readFields(new DataInputStream(new ByteArrayInputStream(byteArray)));
+ assertEquals(filter.getFieldName(), filter2.getFieldName());
+ assertEquals(filter.getMapKey(), filter2.getMapKey());
+ assertEquals(filter.getFilterOp(), filter2.getFilterOp());
+ assertArrayEquals(filter.getOperands().toArray(), filter2.getOperands().toArray());
+ assertEquals(filter.isFilterIfMissing(), filter2.isFilterIfMissing());
+ }
+
+ @Test
+ public void testFilterBasics() {
+ MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String, WebPage>();
+ //set filter field name as outlinks
+ filter.setFieldName(WebPage.Field.OUTLINKS.toString());
+ filter.setMapKey(new Utf8("example"));
+ filter.setFilterOp(FilterOp.EQUALS);
+ filter.setFilterIfMissing(true);
+ filter.getOperands().add(new Utf8("http://example.org"));
+
+ WebPage page = WebPage.newBuilder().build();
+ page.getOutlinks().put(new Utf8("example"), new Utf8("http://example.org"));
+ assertFalse(filter.filter("irrelevant", page));
+ page.getOutlinks().put(new Utf8("example"), new Utf8("http://example2.com"));
+ assertTrue(filter.filter("irrelevant", page));
+ page = new WebPage();
+ assertTrue(filter.filter("irrelevant", page));
+ filter.setFilterIfMissing(false);
+
+ assertFalse(filter.filter("irrelevant", page));
+ }
+
+ @Test
+ public void testFilterEntryInMap() {
+ MapFieldValueFilter<String, WebPage> filter = new MapFieldValueFilter<String, WebPage>();
+ //set filter field name as outlinks
+ filter.setFieldName(WebPage.Field.OUTLINKS.toString());
+ filter.setMapKey(new Utf8("foobar.whatever"));
+ filter.setFilterOp(FilterOp.EQUALS);
+ filter.setFilterIfMissing(true);
+ filter.getOperands().add(new Utf8("Click here for foobar!"));
+
+ WebPage page = WebPage.newBuilder().build();
+ assertTrue(filter.filter("irrelevant", page));
+ page.getOutlinks().put(new Utf8("foobar.whatever"), new Utf8("Mismatch!"));
+ assertTrue(filter.filter("irrelevant", page));
+ page.getOutlinks().put(new Utf8("foobar.whatever"), new Utf8("Click here for foobar!"));
+ assertFalse(filter.filter("irrelevant", page));
+ }
+
+}
Added: gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java (added)
+++ gora/branches/GORA_94/gora-core/src/test/java/org/apache/gora/filter/TestSingleFieldValueFilter.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.filter;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.hadoop.io.WritableUtils;
+import org.junit.Test;
+
+public class TestSingleFieldValueFilter {
+
+ @Test
+ public void testSerialization() throws IOException {
+ SingleFieldValueFilter<String, WebPage> filter = new SingleFieldValueFilter<String, WebPage>();
+ //set filter field to url
+ filter.setFieldName(WebPage.Field.URL.toString());
+ filter.setFilterOp(FilterOp.EQUALS);
+ filter.setFilterIfMissing(true);
+ filter.getOperands().add(new Utf8("http://example.org"));
+ byte[] byteArray = WritableUtils.toByteArray(filter);
+ SingleFieldValueFilter<String, WebPage> filter2 = new SingleFieldValueFilter<String, WebPage>();
+ filter2.readFields(new DataInputStream(new ByteArrayInputStream(byteArray)));
+ assertEquals(filter.getFieldName(), filter2.getFieldName());
+ assertEquals(filter.getFilterOp(), filter2.getFilterOp());
+ assertArrayEquals(filter.getOperands().toArray(), filter2.getOperands().toArray());
+ assertEquals(filter.isFilterIfMissing(), filter2.isFilterIfMissing());
+ }
+
+ @Test
+ public void testFilterBasics() {
+ SingleFieldValueFilter<String, WebPage> filter = new SingleFieldValueFilter<String, WebPage>();
+ //set filter field to url
+ filter.setFieldName(WebPage.Field.URL.toString());
+ filter.setFilterOp(FilterOp.EQUALS);
+ filter.setFilterIfMissing(true);
+ filter.getOperands().add(new Utf8("example.org"));
+
+ WebPage page = WebPage.newBuilder().build();
+ page.setUrl(new Utf8("example.org"));
+ assertFalse(filter.filter("irrelevant", page));
+ page.setUrl(new Utf8("mismatch.whatever"));
+ assertTrue(filter.filter("irrelevant", page));
+ page = new WebPage();
+ assertTrue(filter.filter("irrelevant", page));
+
+ filter.setFilterIfMissing(false);
+
+ assertFalse(filter.filter("irrelevant", page));
+ }
+
+ @Test
+ public void testFilterInequals() {
+ SingleFieldValueFilter<String, WebPage> filter = new SingleFieldValueFilter<String, WebPage>();
+ //set filter field to url
+ filter.setFieldName(WebPage.Field.URL.toString());
+ filter.setFilterOp(FilterOp.NOT_EQUALS);
+ filter.setFilterIfMissing(true);
+ filter.getOperands().add(new Utf8("example.org"));
+
+ WebPage page = WebPage.newBuilder().build();
+ page.setUrl(new Utf8("example.org"));
+ assertTrue(filter.filter("irrelevant", page));
+
+ page.setUrl(new Utf8("something.else"));
+ assertFalse(filter.filter("irrelevant", page));
+ }
+
+}
Modified: gora/branches/GORA_94/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java (original)
+++ gora/branches/GORA_94/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java Wed Jan 29 22:02:59 2014
@@ -21,6 +21,7 @@ package org.apache.gora.dynamodb.query;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import org.apache.gora.filter.Filter;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.query.ws.impl.QueryWSBase;
@@ -377,4 +378,28 @@ public class DynamoDBQuery<K, T extends
public static void setRangeCompOp(ComparisonOperator pRangeCompOp){
rangeCompOp = pRangeCompOp;
}
+
+ @Override
+ public void setFilter(Filter<K, T> filter) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Filter<K, T> getFilter() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setLocalFilterEnabled(boolean enable) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean isLocalFilterEnabled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
}
Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/query/HBaseQuery.java Wed Jan 29 22:02:59 2014
@@ -18,7 +18,6 @@
package org.apache.gora.hbase.query;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
import org.apache.gora.query.impl.QueryBase;
Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseColumn.java Wed Jan 29 22:02:59 2014
@@ -22,7 +22,7 @@ import java.util.Arrays;
/**
* Store family, qualifier tuple
*/
-class HBaseColumn {
+public class HBaseColumn {
final byte[] family;
final byte[] qualifier;
Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/store/HBaseStore.java Wed Jan 29 22:02:59 2014
@@ -42,6 +42,7 @@ import org.apache.gora.hbase.query.HBase
import org.apache.gora.hbase.query.HBaseScannerResult;
import org.apache.gora.hbase.store.HBaseMapping.HBaseMappingBuilder;
import org.apache.gora.hbase.util.HBaseByteInterface;
+import org.apache.gora.hbase.util.HBaseFilterUtil;
import org.apache.gora.persistency.impl.DirtyListWrapper;
import org.apache.gora.persistency.impl.DirtyMapWrapper;
import org.apache.gora.persistency.impl.PersistentBase;
@@ -98,6 +99,8 @@ implements Configurable {
private final boolean autoCreateSchema = true;
private volatile HBaseMapping mapping;
+
+ private HBaseFilterUtil<K, T> filterUtil;
private int scannerCaching = SCANNER_CACHING_PROPERTIES_DEFAULT ;
@@ -113,7 +116,7 @@ implements Configurable {
this.conf = HBaseConfiguration.create(getConf());
admin = new HBaseAdmin(this.conf);
mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
-
+ filterUtil = new HBaseFilterUtil<K, T>(this.conf);
} catch (FileNotFoundException ex) {
try {
mapping = readMapping(getConf().get(PARSE_MAPPING_FILE_KEY, DEPRECATED_MAPPING_FILE));
@@ -160,6 +163,10 @@ implements Configurable {
//return the name of this table
return mapping.getTableName();
}
+
+ public HBaseMapping getMapping() {
+ return mapping;
+ }
@Override
public void createSchema() {
@@ -387,8 +394,7 @@ implements Configurable {
}
List<PartitionQuery<K,T>> partitions = new ArrayList<PartitionQuery<K,T>>(keys.getFirst().length);
for (int i = 0; i < keys.getFirst().length; i++) {
- String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
- getServerAddress().getHostname();
+ String regionLocation = table.getRegionLocation(keys.getFirst()[i]).getServerAddress().getHostname();
byte[] startRow = query.getStartKey() != null ? toBytes(query.getStartKey())
: HConstants.EMPTY_START_ROW;
byte[] stopRow = query.getEndKey() != null ? toBytes(query.getEndKey())
@@ -440,7 +446,7 @@ implements Configurable {
ResultScanner scanner = createScanner(query);
org.apache.gora.query.Result<K,T> result
- = new HBaseScannerResult<K,T>(this,query, scanner);
+ = new HBaseScannerResult<K,T>(this, query, scanner);
return result;
}
@@ -463,6 +469,13 @@ implements Configurable {
scan.setStopRow(toBytes(query.getEndKey()));
}
addFields(scan, query);
+ if (query.getFilter() != null) {
+ boolean succeeded = filterUtil.setFilter(scan, query.getFilter(), this);
+ if (succeeded) {
+ // don't need local filter
+ query.setLocalFilterEnabled(false);
+ }
+ }
return table.getScanner(scan);
}
Added: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java (added)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/BaseFactory.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hbase.util;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+
+public abstract class BaseFactory<K, T extends PersistentBase> implements FilterFactory<K, T> {
+
+ private HBaseFilterUtil<K, T> util;
+
+ @Override
+ public HBaseFilterUtil<K, T> getHbaseFitlerUtil() {
+ return util;
+ }
+
+ @Override
+ public void setHBaseFitlerUtil(HBaseFilterUtil<K, T> util) {
+ this.util = util;
+ }
+
+}
Added: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java (added)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/DefaultFactory.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hbase.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.filter.FilterList;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.Filter;
+import org.apache.gora.filter.MapFieldValueFilter;
+import org.apache.gora.filter.SingleFieldValueFilter;
+import org.apache.gora.hbase.store.HBaseColumn;
+import org.apache.gora.hbase.store.HBaseStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DefaultFactory <K, T extends PersistentBase> extends BaseFactory<K, T> {
+ private static final Log LOG = LogFactory.getLog(DefaultFactory.class);
+
+ @Override
+ public List<String> getSupportedFilters() {
+ List<String> filters = new ArrayList<String>();
+ filters.add(SingleFieldValueFilter.class.getCanonicalName());
+ filters.add(MapFieldValueFilter.class.getCanonicalName());
+ filters.add(FilterList.class.getCanonicalName());
+ return filters;
+ }
+
+ @Override
+ public org.apache.hadoop.hbase.filter.Filter createFilter(Filter<K, T> filter, HBaseStore<K, T> store) {
+ if (filter instanceof FilterList) {
+ FilterList<K, T> filterList = (FilterList<K, T>) filter;
+ org.apache.hadoop.hbase.filter.FilterList hbaseFilter = new org.apache.hadoop.hbase.filter.FilterList(
+ Operator.valueOf(filterList.getOperator().name()));
+ for (Filter<K, T> rowFitler : filterList.getFilters()) {
+ FilterFactory<K, T> factory = getHbaseFitlerUtil().getFactory(rowFitler);
+ if (factory == null) {
+ LOG.warn("HBase remote filter factory not yet implemented for " + rowFitler.getClass().getCanonicalName());
+ return null;
+ }
+ org.apache.hadoop.hbase.filter.Filter hbaseRowFilter = factory.createFilter(rowFitler, store);
+ if (hbaseRowFilter != null) {
+ hbaseFilter.addFilter(hbaseRowFilter);
+ }
+ }
+ return hbaseFilter;
+ } else if (filter instanceof SingleFieldValueFilter) {
+ SingleFieldValueFilter<K, T> fieldFilter = (SingleFieldValueFilter<K, T>) filter;
+
+ HBaseColumn column = store.getMapping().getColumn(fieldFilter.getFieldName());
+ CompareOp compareOp = getCompareOp(fieldFilter.getFilterOp());
+ byte[] family = column.getFamily();
+ byte[] qualifier = column.getQualifier();
+ byte[] value = HBaseByteInterface.toBytes(fieldFilter.getOperands().get(0));
+ SingleColumnValueFilter hbaseFilter = new SingleColumnValueFilter(family, qualifier, compareOp, value);
+ hbaseFilter.setFilterIfMissing(fieldFilter.isFilterIfMissing());
+
+ return hbaseFilter;
+ } else if (filter instanceof MapFieldValueFilter) {
+ MapFieldValueFilter<K, T> mapFilter = (MapFieldValueFilter<K, T>) filter;
+
+ HBaseColumn column = store.getMapping().getColumn(mapFilter.getFieldName());
+ CompareOp compareOp = getCompareOp(mapFilter.getFilterOp());
+ byte[] family = column.getFamily();
+ byte[] qualifier = HBaseByteInterface.toBytes(mapFilter.getMapKey());
+ byte[] value = HBaseByteInterface.toBytes(mapFilter.getOperands().get(0));
+ SingleColumnValueFilter hbaseFilter = new SingleColumnValueFilter(family, qualifier, compareOp, value);
+ hbaseFilter.setFilterIfMissing(mapFilter.isFilterIfMissing());
+
+ return hbaseFilter;
+ } else {
+ LOG.warn("HBase remote filter not yet implemented for " + filter.getClass().getCanonicalName());
+ return null;
+ }
+ }
+
+ private CompareOp getCompareOp(FilterOp filterOp) {
+ switch (filterOp) {
+ case EQUALS:
+ return CompareOp.EQUAL;
+ case NOT_EQUALS:
+ return CompareOp.NOT_EQUAL;
+ case LESS:
+ return CompareOp.LESS;
+ case LESS_OR_EQUAL:
+ return CompareOp.LESS_OR_EQUAL;
+ case GREATER:
+ return CompareOp.GREATER;
+ case GREATER_OR_EQUAL:
+ return CompareOp.GREATER_OR_EQUAL;
+ default:
+ throw new IllegalArgumentException(filterOp + " no HBase equivalent yet");
+ }
+ }
+
+
+}
Added: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java (added)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/FilterFactory.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gora.hbase.util;
+
+import org.apache.gora.filter.Filter;
+import org.apache.gora.hbase.store.HBaseStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+
+import java.util.List;
+
+public interface FilterFactory <K, T extends PersistentBase> {
+ void setHBaseFitlerUtil(HBaseFilterUtil<K, T> util);
+ HBaseFilterUtil<K, T> getHbaseFitlerUtil();
+ List<String> getSupportedFilters();
+ org.apache.hadoop.hbase.filter.Filter createFilter(Filter<K, T> filter, HBaseStore<K, T> store);
+}
Modified: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java?rev=1562607&r1=1562606&r2=1562607&view=diff
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java (original)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseByteInterface.java Wed Jan 29 22:02:59 2014
@@ -205,6 +205,8 @@ public class HBaseByteInterface {
return Bytes.toBytes((String) o);
} else if (clazz.equals(Utf8.class)) {
return ((Utf8) o).getBytes();
+ } else if (clazz.isArray() && clazz.getComponentType().equals(Byte.TYPE)) {
+ return (byte[])o;
}
throw new RuntimeException("Can't parse data as class: " + clazz);
}
Added: gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java
URL: http://svn.apache.org/viewvc/gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java?rev=1562607&view=auto
==============================================================================
--- gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java (added)
+++ gora/branches/GORA_94/gora-hbase/src/main/java/org/apache/gora/hbase/util/HBaseFilterUtil.java Wed Jan 29 22:02:59 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.hbase.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gora.filter.Filter;
+import org.apache.gora.hbase.store.HBaseStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.GoraException;
+import org.apache.gora.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class HBaseFilterUtil<K, T extends PersistentBase> {
+ private static final Log LOG = LogFactory.getLog(HBaseFilterUtil.class);
+
+ private Map<String, FilterFactory<K, T>> factories = new LinkedHashMap<String, FilterFactory<K, T>>();
+
+ public HBaseFilterUtil(Configuration conf) throws GoraException {
+ String[] factoryClassNames = conf.getStrings("gora.hbase.filter.factories", "org.apache.gora.hbase.util.DefaultFactory");
+
+ for (String factoryClass : factoryClassNames) {
+ try {
+ @SuppressWarnings("unchecked")
+ FilterFactory<K, T> factory = (FilterFactory<K, T>) ReflectionUtils.newInstance(factoryClass);
+ for (String filterClass : factory.getSupportedFilters()) {
+ factories.put(filterClass, factory);
+ }
+ factory.setHBaseFitlerUtil(this);
+ } catch (Exception e) {
+ throw new GoraException(e);
+ }
+ }
+ }
+
+ public FilterFactory<K, T> getFactory(Filter<K, T> fitler) {
+ return factories.get(fitler.getClass().getCanonicalName());
+ }
+
+ /**
+ * Set a filter on the Scan. It translates a Gora filter to a HBase filter.
+ *
+ * @param scan
+ * @param filter
+ * The Gora filter.
+ * @param store
+ * The HBaseStore.
+ * @return if remote filter is succesfully applied.
+ */
+ public boolean setFilter(Scan scan, Filter<K, T> filter, HBaseStore<K, T> store) {
+
+ FilterFactory<K, T> factory = getFactory(filter);
+ if (factory != null) {
+ org.apache.hadoop.hbase.filter.Filter hbaseFilter = factory.createFilter(filter, store);
+ if (hbaseFilter != null) {
+ scan.setFilter(hbaseFilter);
+ return true;
+ } else {
+ LOG.warn("HBase remote filter not yet implemented for " + filter.getClass().getCanonicalName());
+ return false;
+ }
+ } else {
+ LOG.warn("HBase remote filter factory not yet implemented for " + filter.getClass().getCanonicalName());
+ return false;
+ }
+ }
+
+}