You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:23 UTC

[16/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
new file mode 100755
index 0000000..8209445
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.EntityQualifierUtils;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.Qualifier;
+import org.apache.eagle.common.ByteUtil;
+import org.apache.eagle.query.parser.*;
+import org.apache.hadoop.hbase.filter.*;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * the steps of building hbase filters
+ * 1. receive ORExpression from eagle-antlr
+ * 2. iterate all ANDExpression in ORExpression
+ *    2.1 put each ANDExpression to a new filter list with MUST_PASS_ONE option
+ *    2.2 iterate all AtomicExpression in ANDExpression
+ *       2.2.1 group AtomicExpression into 2 groups by looking up metadata, one is for tag filters, the other is for column filters
+ *       2.2.2 put the above 2 filters to a filter list with MUST_PASS_ALL option
+ */
+public class HBaseFilterBuilder {
+	private static final Logger LOG = LoggerFactory.getLogger(HBaseFilterBuilder.class);
+	
+	/**
+	 * syntax is @<fieldname>
+	 */
+//	private static final String fnRegex = "^@(.*)$";
+	private static final Pattern _fnPattern = TokenConstant.ID_PATTERN;// Pattern.compile(fnRegex);
+	private static final Charset _defaultCharset = Charset.forName("ISO-8859-1");
+
+	private ORExpression _orExpr;
+	private EntityDefinition _ed;
+	private boolean _filterIfMissing;
+	private Charset _charset = _defaultCharset;
+
+	/**
+	 * TODO: Verify performance impact
+	 *
+	 * @return
+	 */
+	public Set<String> getFilterFields() {
+		return _filterFields;
+	}
+
+	/**
+	 * Just add filter fields for expression filter
+	 */
+	private Set<String> _filterFields;
+
+	public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr) {
+		this(ed, orExpr, false);
+	}
+	
+	public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr, boolean filterIfMissing) {
+		this._ed = ed;
+		this._orExpr = orExpr;
+		this._filterIfMissing = filterIfMissing;
+	}
+	
+	public void setCharset(String charsetName){
+		_charset = Charset.forName(charsetName);
+	}
+	
+	public Charset getCharset(){
+		return _charset;
+	}
+	
+	/**
+	 * Because we don't have metadata for tag, we regard non-qualifer field as tag. So one field possibly is not a real tag when this function return true. This happens
+	 * when a user input an wrong field name which is neither tag or qualifier
+	 *   
+	 * @param field
+	 */
+	private boolean isTag(String field){
+		return _ed.isTag(field);
+	}
+	
+	/**
+	 * check whether this field is one entity attribute or not 
+	 * @param fieldName
+	 * @return
+	 */
+	private String parseEntityAttribute(String fieldName){
+		Matcher m = _fnPattern.matcher(fieldName);
+		if(m.find()){
+			return m.group(1);
+		}
+		return null;
+	}
+
+	/**
+	 * Return the partition values for each or expression. The size of the returned list should be equal to
+	 * the size of FilterList that {@link #buildFilters()} returns.
+	 * 
+	 * TODO: For now we don't support one query to query multiple partitions. In future if partition is defined, 
+	 * for the entity, internally We need to spawn multiple queries and send one query for each partition.
+	 * 
+	 * @return Return the partition values for each or expression. Return null if the entity doesn't support
+	 * partition
+	 */
+	public List<String[]> getPartitionValues() {
+		final String[] partitions = _ed.getPartitions();
+		if (partitions == null || partitions.length == 0) {
+			return null;
+		}
+		final List<String[]> result = new ArrayList<String[]>();
+		final Map<String, String> partitionKeyValueMap = new HashMap<String, String>();
+		for(ANDExpression andExpr : _orExpr.getANDExprList()) {
+			partitionKeyValueMap.clear();
+			for(AtomicExpression ae : andExpr.getAtomicExprList()) {
+				// TODO temporarily ignore those fields which are not for attributes
+				if(ae.getKeyType() == TokenType.ID) {
+					final String fieldName = parseEntityAttribute(ae.getKey());
+					if (fieldName == null) {
+						LOG.warn(fieldName + " field does not have format @<FieldName>, ignored");
+						continue;
+					}
+					if (_ed.isPartitionTag(fieldName) && ComparisonOperator.EQUAL.equals(ae.getOp())) {
+						final String value = ae.getValue();
+						partitionKeyValueMap.put(fieldName, value);
+					}
+				}
+			}
+			final String[] values = new String[partitions.length];
+			result.add(values);
+			for (int i = 0; i < partitions.length; ++i) {
+				final String partition = partitions[i];
+				final String value = partitionKeyValueMap.get(partition);
+				values[i] = value;
+			}
+		}
+		return result;
+	}
+
+	/**
+	 * @see org.apache.eagle.query.parser.TokenType
+	 *
+	 * @return
+	 */
+	public FilterList buildFilters(){
+		// TODO: Optimize to select between row filter or column filter for better performance
+		// Use row key filter priority by default
+		boolean rowFilterPriority = true;
+
+		FilterList fltList = new FilterList(Operator.MUST_PASS_ONE);
+		for(ANDExpression andExpr : _orExpr.getANDExprList()){
+			
+			FilterList list = new FilterList(Operator.MUST_PASS_ALL);
+			Map<String, List<String>> tagFilters = new HashMap<String, List<String>>();
+			List<QualifierFilterEntity> qualifierFilters = new ArrayList<QualifierFilterEntity>();
+//			List<QualifierFilterEntry> tagLikeQualifierFilters = new ArrayList<QualifierFilterEntry>();
+
+			// TODO refactor not to use too much if/else
+			for(AtomicExpression ae : andExpr.getAtomicExprList()){
+				// TODO temporarily ignore those fields which are not for attributes
+
+				String fieldName = ae.getKey();
+				if(ae.getKeyType() == TokenType.ID){
+					fieldName = parseEntityAttribute(fieldName);
+					if(fieldName == null){
+						LOG.warn(fieldName + " field does not have format @<FieldName>, ignored");
+						continue;
+					}
+				}
+
+				String value = ae.getValue();
+				ComparisonOperator op = ae.getOp();
+				TokenType keyType = ae.getKeyType();
+				TokenType valueType = ae.getValueType();
+				QualifierFilterEntity entry = new QualifierFilterEntity(fieldName,value,op,keyType,valueType);
+
+				// TODO Exact match, need to add escape for those special characters here, including:
+				// "-", "[", "]", "/", "{", "}", "(", ")", "*", "+", "?", ".", "\\", "^", "$", "|"
+
+				if(keyType == TokenType.ID && isTag(fieldName)){
+					if ((ComparisonOperator.EQUAL.equals(op) || ComparisonOperator.IS.equals(op))
+							&& !TokenType.NULL.equals(valueType))
+					{
+						// Use RowFilter for equal TAG
+						if(tagFilters.get(fieldName) == null) tagFilters.put(fieldName, new ArrayList<String>());
+						tagFilters.get(fieldName).add(value);
+					} else if (rowFilterPriority && ComparisonOperator.IN.equals(op))
+					{
+						// Use RowFilter here by default
+						if(tagFilters.get(fieldName) == null) tagFilters.put(fieldName, new ArrayList<String>());
+						tagFilters.get(fieldName).addAll(EntityQualifierUtils.parseList(value));
+					} else if (ComparisonOperator.LIKE.equals(op)
+						|| ComparisonOperator.NOT_LIKE.equals(op)
+						|| ComparisonOperator.CONTAINS.equals(op)
+						|| ComparisonOperator.NOT_CONTAINS.equals(op)
+						|| ComparisonOperator.IN.equals(op)
+						|| ComparisonOperator.IS.equals(op)
+						|| ComparisonOperator.IS_NOT.equals(op)
+						|| ComparisonOperator.NOT_EQUAL.equals(op)
+						|| ComparisonOperator.EQUAL.equals(op)
+						|| ComparisonOperator.NOT_IN.equals(op))
+					{
+						qualifierFilters.add(entry);
+					} else
+					{
+						LOG.warn("Don't support operation: \"" + op + "\" on tag field: " + fieldName + " yet, going to ignore");
+						throw new IllegalArgumentException("Don't support operation: "+op+" on tag field: "+fieldName+", avaliable options: =, =!, =~, !=~, in, not in, contains, not contains");
+					}
+				}else{
+					qualifierFilters.add(entry);
+				}
+			}
+
+			// Build RowFilter for equal tags
+			list.addFilter(buildTagFilter(tagFilters));
+
+			// Build SingleColumnValueFilter
+			FilterList qualifierFilterList = buildQualifierFilter(qualifierFilters);
+			if(qualifierFilterList != null && qualifierFilterList.getFilters().size()>0){
+				list.addFilter(qualifierFilterList);
+			}else {
+				if(LOG.isDebugEnabled()) LOG.debug("Ignore empty qualifier filter from "+qualifierFilters.toString());
+			}
+			fltList.addFilter(list);
+		}
+		LOG.info("Query: " + _orExpr.toString() + " => Filter: " + fltList.toString());
+		return fltList;
+	}
+	
+	/**
+	 * _charset is used to decode the byte array, in hbase server, RegexStringComparator uses the same
+	 * charset to decode the byte array stored in qualifier
+	 * for tag filter regex, it's always ISO-8859-1 as it only comes from String's hashcode (Integer)
+	 * Note: regex comparasion is to compare String
+	 */
+	protected Filter buildTagFilter(Map<String, List<String>> tagFilters){
+		RegexStringComparator regexStringComparator = new RegexStringComparator(buildTagFilterRegex(tagFilters));
+		regexStringComparator.setCharset(_charset);
+		RowFilter filter = new RowFilter(CompareOp.EQUAL, regexStringComparator);
+		return filter;
+	}
+	
+	/**
+	 * all qualifiers' condition must be satisfied.
+	 *
+	 * <H1>Use RegexStringComparator for:</H1>
+	 *      IN
+	 *      LIKE
+	 *      NOT_LIKE
+	 *
+	 * <H1>Use SubstringComparator for:</H1>
+	 *      CONTAINS
+	 *
+	 * <H1>Use EntityQualifierHelper for:</H1>
+	 *      EQUALS
+	 *      NOT_EUQALS
+	 *      LESS
+	 *      LESS_OR_EQUAL
+	 *      GREATER
+	 *      GREATER_OR_EQUAL
+	 *
+	 * <H2>
+	 *     TODO: Compare performance of RegexStringComparator ,SubstringComparator ,EntityQualifierHelper
+	 * </H2>
+	 *
+	 * @param qualifierFilters
+	 * @return
+	 */
+	protected FilterList buildQualifierFilter(List<QualifierFilterEntity> qualifierFilters){
+		FilterList list = new FilterList(Operator.MUST_PASS_ALL);
+		// iterate all the qualifiers
+		for(QualifierFilterEntity entry : qualifierFilters){
+			// if contains expression based filter
+			if(entry.getKeyType() == TokenType.EXP
+					|| entry.getValueType() == TokenType.EXP
+					|| entry.getKeyType() != TokenType.ID){
+				if(!EagleConfigFactory.load().isCoprocessorEnabled()) {
+					LOG.warn("Expression in filter may not support, because custom filter and coprocessor is disabled: " + entry.toString());
+				}
+				list.addFilter(buildExpressionBasedFilter(entry));
+				continue;
+			}
+
+			// else using SingleColumnValueFilter
+			String qualifierName = entry.getKey();
+			if(!isTag(entry.getKey())){
+				Qualifier qualifier = _ed.getDisplayNameMap().get(entry.getKey());
+				qualifierName = qualifier.getQualifierName();
+			}
+
+			// Comparator to be used for building HBase Filter
+			// WritableByteArrayComparable comparator;
+            ByteArrayComparable comparable;
+			if(ComparisonOperator.IN.equals(entry.getOp())
+				|| ComparisonOperator.NOT_IN.equals(entry.getOp())){
+				Filter setFilter = buildListQualifierFilter(entry);
+				if(setFilter!=null){
+					list.addFilter(setFilter);
+				}
+			}else{
+				// If [=,!=,is,is not] NULL, use NullComparator else throw exception
+				if(TokenType.NULL.equals(entry.getValueType())){
+					if(ComparisonOperator.EQUAL.equals(entry.getOp())
+						||ComparisonOperator.NOT_EQUAL.equals(entry.getOp())
+						||ComparisonOperator.IS.equals(entry.getOp())
+						||ComparisonOperator.IS_NOT.equals(entry.getOp()))
+                        comparable = new NullComparator();
+					else
+						throw new IllegalArgumentException("Operation: "+entry.getOp()+" with NULL is not supported yet: "+entry.toString()+", avaliable options: [=, !=, is, is not] null|NULL");
+				}
+				// If [contains, not contains],use SubstringComparator
+				else if (ComparisonOperator.CONTAINS.equals(entry.getOp())
+					|| ComparisonOperator.NOT_CONTAINS.equals(entry.getOp())) {
+                    comparable = new SubstringComparator(entry.getValue());
+				}
+				// If [like, not like], use RegexStringComparator
+				else if (ComparisonOperator.LIKE.equals(entry.getOp())
+						|| ComparisonOperator.NOT_LIKE.equals(entry.getOp())){
+					// Use RegexStringComparator for LIKE / NOT_LIKE
+					RegexStringComparator _comparator = new RegexStringComparator(buildQualifierRegex(entry.getValue()));
+					_comparator.setCharset(_charset);
+                    comparable = _comparator;
+				} else{
+					Class type = EntityQualifierUtils.getType(_ed, entry.getKey());
+					// if type is null (is Tag or not found) or not defined for TypedByteArrayComparator
+					if(!EagleConfigFactory.load().isCoprocessorEnabled() || type == null || TypedByteArrayComparator.get(type) == null){
+                        comparable = new BinaryComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), entry.getValue()));
+					}else {
+                        comparable = new TypedByteArrayComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), entry.getValue()),type);
+					}
+				}
+
+				SingleColumnValueFilter filter =
+						new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparable);
+				filter.setFilterIfMissing(_filterIfMissing);
+				list.addFilter(filter);
+			}
+		}
+
+		return list;
+	}
+
+	private Filter buildExpressionBasedFilter(QualifierFilterEntity entry) {
+		BooleanExpressionComparator expressionComparator  = new BooleanExpressionComparator(entry,_ed);
+		_filterFields = expressionComparator.getRequiredFields();
+		RowValueFilter filter = new RowValueFilter(expressionComparator);
+		return filter;
+	}
+
+	/**
+	 * Currently use BinaryComparator only
+	 * <h2>TODO: </h2>
+	 * Possibility to tune performance by using: OR[BinaryComparator,...] instead of RegexStringComparator?
+	 *
+	 *<br/> <br/>
+	 *
+	 * ! Check op must be IN or NOTIN in caller
+	 *
+	 * @param entry
+	 * @return
+	 */
+	private Filter buildListQualifierFilter(QualifierFilterEntity entry){
+		List<String> valueSet = EntityQualifierUtils.parseList(entry.getValue());
+		Iterator<String> it = valueSet.iterator();
+		String fieldName = entry.getKey();
+		String qualifierName = fieldName;
+		if(!_ed.isTag(entry.getKey())){
+			qualifierName = _ed.getDisplayNameMap().get(entry.getKey()).getQualifierName();
+		}
+
+// TODO: Try to use RegExp just work if possible
+// Because single SingleColumnValueFilter is much faster than multi SingleColumnValueFilters in OR list.
+//		Class qualifierType = EntityQualifierHelper.getType(_ed,fieldName);
+//		if( qualifierType == null || qualifierType == String.class){
+//			boolean first = true;
+//			StringBuilder filterRegex = new StringBuilder();
+//			filterRegex.append("^(");
+//			while(it.hasNext()) {
+//				String value = it.next();
+//				if(value == null) {
+//					logger.warn("ignore empty value in set qualifier filter: "+entry.toString());
+//					continue;
+//				}
+//				if(!first) filterRegex.append("|");
+//				filterRegex.append(value);
+//				first = false;
+//			}
+//			filterRegex.append(")$");
+//			RegexStringComparator regexStringComparator = new RegexStringComparator(filterRegex.toString());
+//			return new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(),
+//					convertToHBaseCompareOp(entry.getOp()), regexStringComparator);
+//		}else{
+		FilterList setFilterList;
+		if(ComparisonOperator.IN.equals(entry.getOp())){
+			setFilterList = new FilterList(Operator.MUST_PASS_ONE);
+		}else if(ComparisonOperator.NOT_IN.equals(entry.getOp())) {
+			setFilterList = new FilterList(Operator.MUST_PASS_ALL);
+		}else{
+			throw new IllegalArgumentException(String.format("Don't support operation: %s on LIST type of value yet: %s, valid options: IN/NOT IN [LIST]",entry.getOp(),entry.toString()));
+		}
+
+		while(it.hasNext()) {
+			String value = it.next();
+			BinaryComparator comparator = new BinaryComparator(EntityQualifierUtils.toBytes(_ed, fieldName, value));
+			SingleColumnValueFilter filter =
+					new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparator);
+			filter.setFilterIfMissing(_filterIfMissing);
+			setFilterList.addFilter(filter);
+		}
+
+		return setFilterList;
+//		}
+	}
+
+	/**
+	 * Just used for LIKE and NOT_LIKE
+	 *
+	 * @param qualifierValue
+	 * @return
+	 */
+	protected String buildQualifierRegex(String qualifierValue){
+		StringBuilder sb = new StringBuilder();
+//		sb.append("(?s)");
+		sb.append("^");
+		sb.append(qualifierValue);
+		sb.append("$");
+		return sb.toString();
+	}
+	
+	  /**
+	   * Appends the given ID to the given buffer, followed by "\\E".
+	   * [steal it from opentsdb, thanks opentsdb :) https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java]
+	   */
+	  private static void addId(final StringBuilder buf, final byte[] id) {
+		buf.append("\\Q");
+	    boolean backslash = false;
+	    for (final byte b : id) {
+	      buf.append((char) (b & 0xFF));
+	      if (b == 'E' && backslash) {  // If we saw a `\' and now we have a `E'.
+	        // So we just terminated the quoted section because we just added \E
+	        // to `buf'.  So let's put a litteral \E now and start quoting again.
+	        buf.append("\\\\E\\Q");
+	      } else {
+	        backslash = b == '\\';
+	      }
+	    }
+	    buf.append("\\E");
+	  }
+	  
+	  @SuppressWarnings("unused")
+	  private static void addId(final StringBuilder buf, final String id) {
+		    buf.append("\\Q");
+		  	int len = id.length()-1;
+		    boolean backslash = false;
+		    for (int i =0; i < len; i++) {
+		      char c = id.charAt(i);
+		      buf.append(c);
+		      if (c == 'E' && backslash) {  // If we saw a `\' and now we have a `E'.
+		        // So we just terminated the quoted section because we just added \E
+		        // to `buf'.  So let's put a litteral \E now and start quoting again.
+		        buf.append("\\\\E\\Q");
+		      } else {
+		        backslash = c == '\\';
+		      }
+		    }
+		    buf.append("\\E");
+		  }
+	
+	/**
+	 * one search tag may have multiple values which have OR relationship, and relationship between
+	 * different search tags is AND
+	 * the query is like "(TAG1=value11 OR TAG1=value12) AND TAG2=value2"
+	 * @param tags
+	 * @return
+	 */
+	protected String buildTagFilterRegex(Map<String, List<String>> tags){
+		// TODO need consider that \E could be part of tag, refer to https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java
+		final SortedMap<Integer, List<Integer>> tagHash = new TreeMap<Integer, List<Integer>>();
+		final int numOfPartitionFields = (_ed.getPartitions() == null) ? 0 : _ed.getPartitions().length;
+		for(Map.Entry<String, List<String>> entry : tags.entrySet()){
+			String tagName = entry.getKey();
+			// Ignore tag if the tag is one of partition fields
+			if (_ed.isPartitionTag(tagName)) {
+				continue;
+			}
+			List<String> stringValues = entry.getValue();
+			List<Integer> hashValues = new ArrayList<Integer>(stringValues.size());
+			for(String value : stringValues){
+				hashValues.add(value.hashCode());
+			}
+			tagHash.put(tagName.hashCode(), hashValues);
+		}
+		
+		// header = prefix(4 bytes) + partition_hashes(4*N bytes) + timestamp (8 bytes)
+		final int headerLength = 4 + numOfPartitionFields * 4 + 8;
+
+		// <tag1:4><value1:4> ... <tagn:4><valuen:4>
+		StringBuilder sb = new StringBuilder();
+		sb.append("(?s)");
+		sb.append("^(?:.{").append(headerLength).append("})");
+		sb.append("(?:.{").append(8).append("})*"); // for any number of tags
+		for (Map.Entry<Integer, List<Integer>> entry : tagHash.entrySet()) {
+			try {
+				addId(sb, ByteUtil.intToBytes(entry.getKey()));
+				List<Integer> hashValues = entry.getValue();
+				sb.append("(?:");
+				boolean first = true;
+				for(Integer value : hashValues){
+					if(!first){
+						sb.append('|');
+					}
+					addId(sb, ByteUtil.intToBytes(value));
+					first = false;
+				}
+				sb.append(")");
+				sb.append("(?:.{").append(8).append("})*"); // for any number of tags
+			} catch (Exception ex) {
+				LOG.error("constructing regex error", ex);
+			}
+		}
+		sb.append("$");
+		if(LOG.isDebugEnabled()) LOG.debug("Tag filter pattern is " + sb.toString());
+		return sb.toString();
+	}
+
+	/**
+	 * Convert ComparisonOperator to native HBase CompareOp
+	 *
+	 * Support:
+	 *      =, =~,CONTAINS,<,<=,>,>=,!=,!=~
+	 *
+	 * @param comp
+	 * @return
+	 */
+	protected static CompareOp convertToHBaseCompareOp(ComparisonOperator comp) {
+		if(comp == ComparisonOperator.EQUAL || comp == ComparisonOperator.LIKE
+				|| comp == ComparisonOperator.CONTAINS
+				|| comp == ComparisonOperator.IN
+				|| comp == ComparisonOperator.IS
+				) {
+			return CompareOp.EQUAL;
+		}else if(comp == ComparisonOperator.LESS) {
+			return CompareOp.LESS;
+		} else if(comp == ComparisonOperator.LESS_OR_EQUAL){
+			return CompareOp.LESS_OR_EQUAL;
+		}else if(comp == ComparisonOperator.GREATER) {
+			return CompareOp.GREATER;
+		} else if(comp == ComparisonOperator.GREATER_OR_EQUAL){
+			return CompareOp.GREATER_OR_EQUAL;
+		} else if(comp == ComparisonOperator.NOT_EQUAL
+				|| comp == ComparisonOperator.NOT_LIKE
+				|| comp == ComparisonOperator.NOT_CONTAINS
+				|| comp == ComparisonOperator.IS_NOT
+				|| comp == ComparisonOperator.NOT_IN)
+		{
+			return CompareOp.NOT_EQUAL;
+		} else {
+			LOG.error("{} operation is not supported now\n", comp);
+			throw new IllegalArgumentException("Illegal operation: "+comp+ ", avaliable options: "+ Arrays.toString(ComparisonOperator.values()));
+		}
+	}
+
+	protected static CompareOp getHBaseCompareOp(String comp) {
+		return convertToHBaseCompareOp(ComparisonOperator.locateOperator(comp));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
new file mode 100755
index 0000000..6cdc77b
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import org.apache.eagle.query.parser.ComparisonOperator;
+import org.apache.eagle.query.parser.TokenType;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class QualifierFilterEntity implements Writable{
+	public String key;
+	public String value;
+	public ComparisonOperator op;
+	public TokenType valueType;
+	public TokenType keyType;
+
+	public QualifierFilterEntity(){}
+	public QualifierFilterEntity(String key, String value, ComparisonOperator comp, TokenType keyType, TokenType valueType) {
+		super();
+		this.key = key;
+		this.value = value;
+		this.op = comp;
+		this.keyType = keyType;
+		this.valueType = valueType;
+	}
+
+	public String getKey() {
+		return key;
+	}
+
+	public void setKey(String key) {
+		this.key = key;
+	}
+
+	public String getValue() {
+		return value;
+	}
+
+	public void setValue(String value) {
+		this.value = value;
+	}
+
+	public ComparisonOperator getOp() {
+		return op;
+	}
+
+	public void setOp(ComparisonOperator op) {
+		this.op = op;
+	}
+
+	public TokenType getValueType() {
+		return valueType;
+	}
+
+	public void setValueType(TokenType valueType) {
+		this.valueType = valueType;
+	}
+
+	public void setKeyType(TokenType keyType){
+		this.keyType = keyType;
+	}
+	public TokenType getKeyType(){
+		return this.keyType;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("%s %s %s",this.key,this.op,this.value);
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		out.writeUTF(this.key);
+		out.writeUTF(this.getValue());
+		out.writeUTF(this.op.name());
+		out.writeUTF(this.keyType.name());
+		out.writeUTF(this.valueType.name());
+	}
+
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		this.key = in.readUTF();
+		this.value = in.readUTF();
+		this.op = ComparisonOperator.valueOf(in.readUTF());
+		this.keyType = TokenType.valueOf(in.readUTF());
+		this.valueType = TokenType.valueOf(in.readUTF());
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
new file mode 100755
index 0000000..a4b97ea
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.io.WritableComparable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * TODO: Critical performance problem!!!
+ * TODO: Refactor to specified multi-column filter so that avoid return all qualifier columns from region server to client side
+ *
+ * @since 2014/11/17
+ */
+public class RowValueFilter extends FilterBase {
+    private final static Logger LOG = LoggerFactory.getLogger(RowValueFilter.class);
+    private boolean filterOutRow = false;
+    private WritableComparable<List<KeyValue>> comparator;
+
+    // TODO: Use qualifiers to reduce network tranfer
+//    private List<byte[]> qualifiers;
+    public RowValueFilter(){}
+
+    /**
+     * Filter out row if WritableComparable.compareTo return 0
+     * @param comparator <code>WritableComparable[List[KeyValue]]</code>
+     */
+    public RowValueFilter(WritableComparable<List<KeyValue>> comparator){
+        this.comparator = comparator;
+    }
+
+//    public RowValueFilter(List<byte[]> qualifiers,WritableComparable<List<KeyValue>> comparator){
+//        this.qualifiers = qualifiers;
+//        this.comparator = comparator;
+//    }
+
+    /**
+     * Old interface in hbase-0.94
+     *
+     * @param out
+     * @throws IOException
+     */
+    @Deprecated
+    public void write(DataOutput out) throws IOException {
+        this.comparator.write(out);
+    }
+
+    /**
+     * Old interface in hbase-0.94
+     *
+     * @param in
+     * @throws IOException
+     */
+//    @Override
+    @Deprecated
+    public void readFields(DataInput in) throws IOException {
+        this.comparator = new BooleanExpressionComparator();
+        this.comparator.readFields(in);
+    }
+
+    /**
+     * TODO: Currently still use older serialization method from hbase-0.94, need to migrate into ProtoBuff based
+     *
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public byte[] toByteArray() throws IOException {
+        ByteArrayDataOutput byteArrayDataOutput = ByteStreams.newDataOutput();
+        this.comparator.write(byteArrayDataOutput);
+        return byteArrayDataOutput.toByteArray();
+    }
+
+    /**
+     * TODO: Currently still use older serialization method from hbase-0.94, need to migrate into ProtoBuff based
+     */
+    // Override static method
+    public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException {
+        ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(pbBytes);
+        RowValueFilter filter = new RowValueFilter();
+        try {
+            filter.readFields(byteArrayDataInput);
+        } catch (IOException e) {
+            LOG.error("Got error to deserialize RowValueFilter from PB bytes",e);
+            throw new DeserializationException(e);
+        }
+        return filter;
+    }
+
+    @Override
+    public boolean hasFilterRow(){
+        return true;
+    }
+
+    @Override
+    public void filterRow(List<KeyValue> row) {
+        filterOutRow = (this.comparator.compareTo(row) == 0);
+    }
+
+    @Override
+    public void reset() {
+        this.filterOutRow = false;
+    }
+
+    @Override
+    public boolean filterRow(){
+        return filterOutRow;
+    }
+
+    @Override
+    public String toString() {
+        return super.toString()+" ( "+this.comparator.toString()+" )";
+    }
+
+//    public List<byte[]> getQualifiers() {
+//        return qualifiers;
+//    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
new file mode 100755
index 0000000..ecaf8cc
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.io.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <h1>TypedByteArrayComparator</h1>
+ *
+ * Compare byte array: <code>byte[] value</code> with class type: <code>Class type</code>
+ *
+ * <br/>
+ * <br/>
+ * Built-in support:
+ *
+ *  <pre>
+ *    Double
+ *    double
+ *    Integer
+ *    int
+ *    Long
+ *    long
+ *    Short
+ *    short
+ *    Boolean
+ *    boolean
+ *  </pre>
+ *
+ *  And can be extend by defining new {@link RawComparator} and register with  {@link #define(Class type, RawComparator comparator)}
+ * <br/>
+ * <br/>
+ */
+public class TypedByteArrayComparator extends ByteArrayComparable {
+    private final static Logger LOG = LoggerFactory.getLogger(TypedByteArrayComparator.class);
+
+    private Class type;
+
+    // Not need to be writable
+    private RawComparator comparator;
+
+    /**
+     * Default constructor for writable
+     */
+    @SuppressWarnings("unused")
+    public TypedByteArrayComparator(){
+        super(null);
+    }
+
+    public TypedByteArrayComparator(byte[] value, Class type){
+        super(value);
+        this.type = type;
+        this.comparator = get(this.type);
+        if(this.comparator == null) throw new IllegalArgumentException("No comparator found for class: "+type);
+    }
+
+    /**
+     * @param in hbase-0.94 interface
+     * @throws IOException
+     */
+//    @Override
+    public void readFields(DataInput in) throws IOException {
+//        super.readFields(in);
+        try {
+            String _type = in.readUTF();
+            type = _primitiveTypeClassMap.get(_type);
+            if(type == null) {
+                type = Class.forName(_type);
+            }
+            comparator = get(type);
+            if(comparator == null) throw new IllegalArgumentException("No comparator found for class: "+type);
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e.getMessage(),e);
+        }
+    }
+
+    /**
+     * @param out hbase-0.94 interface
+     * @throws IOException
+     */
+//    @Override
+    public void write(DataOutput out) throws IOException {
+//        super.write(out);
+        String typeName = type.getName();
+        out.writeUTF(typeName);
+    }
+
+    /**
+     * For hbase 0.98
+     *
+     * @return serialized byte array
+     */
+    @Override
+    public byte[] toByteArray() {
+        ByteArrayDataOutput byteArrayDataOutput = ByteStreams.newDataOutput();
+        try {
+            this.write(byteArrayDataOutput);
+            return byteArrayDataOutput.toByteArray();
+        } catch (IOException e) {
+            LOG.error("Failed to serialize due to: "+e.getMessage(),e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * For hbase 0.98
+     *
+     * @param bytes raw byte array
+     * @return Comparator instance
+     * @throws DeserializationException
+     */
+    public static TypedByteArrayComparator parseFrom(final byte [] bytes)
+            throws DeserializationException {
+        TypedByteArrayComparator comparator = new TypedByteArrayComparator();
+        ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(bytes);
+        try {
+            comparator.readFields(byteArrayDataInput);
+        } catch (IOException e) {
+            LOG.error("Got error to deserialize TypedByteArrayComparator from PB bytes",e);
+            throw new DeserializationException(e);
+        }
+        return comparator;
+    }
+
+    @Override
+    public int compareTo(byte[] value, int offset, int length) {
+        return this.comparator.compare(this.getValue(), 0, this.getValue().length, value, offset, length);
+    }
+
+    /**
+     * <ol>
+     * <li>Try registered comparator</li>
+     * <li>If not found, try all possible WritableComparator</li>
+     * </ol>
+     *
+     * If not found finally, throw new IllegalArgumentException("unable to get comparator for class: "+type);
+     *
+     * @param type value type class
+     * @return RawComparator
+     */
+    public static RawComparator get(Class type){
+        RawComparator comparator = null;
+        try {
+            comparator = _typedClassComparator.get(type);
+        }catch (ClassCastException ex){
+            // ignore
+        }
+        try {
+            if (comparator == null) comparator = WritableComparator.get(type);
+        }catch (ClassCastException ex){
+            // ignore
+        }
+        return comparator;
+    }
+
+    private final static Map<Class,RawComparator> _typedClassComparator = new HashMap<Class, RawComparator>();
+    public static void define(Class type, RawComparator comparator){
+        _typedClassComparator.put(type,comparator);
+    }
+
+    static{
+        define(Double.class, WritableComparator.get(DoubleWritable.class));
+        define(double.class, WritableComparator.get(DoubleWritable.class));
+        define(Integer.class, WritableComparator.get(IntWritable.class));
+        define(int.class, WritableComparator.get(IntWritable.class));
+        define(Long.class, WritableComparator.get(LongWritable.class));
+        define(long.class, WritableComparator.get(LongWritable.class));
+        define(Short.class, WritableComparator.get(ShortWritable.class));
+        define(short.class, WritableComparator.get(ShortWritable.class));
+        define(Boolean.class, WritableComparator.get(BooleanWritable.class));
+        define(boolean.class, WritableComparator.get(BooleanWritable.class));
+    }
+
+    /**
+     * Because {@link Class#forName } can't find class for primitive type
+     */
+    private final static Map<String,Class> _primitiveTypeClassMap = new HashMap<String, Class>();
+    static {
+        _primitiveTypeClassMap.put(int.class.getName(),int.class);
+        _primitiveTypeClassMap.put(double.class.getName(),double.class);
+        _primitiveTypeClassMap.put(long.class.getName(),long.class);
+        _primitiveTypeClassMap.put(short.class.getName(),short.class);
+        _primitiveTypeClassMap.put(boolean.class.getName(),boolean.class);
+        _primitiveTypeClassMap.put(char.class.getName(),char.class);
+        _primitiveTypeClassMap.put(byte.class.getName(),byte.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
new file mode 100755
index 0000000..418ab33
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.entity.LogReader;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+
+public abstract class IndexLogReader implements LogReader {
+
+	// TODO: Work around https://issues.apache.org/jira/browse/HBASE-2198. More graceful implementation should use SingleColumnValueExcludeFilter, 
+	// but it's complicated in current implementation. 
+	protected static void workaroundHBASE2198(Get get, Filter filter,byte[][] qualifiers) {
+		if (filter instanceof SingleColumnValueFilter) {
+			if(qualifiers == null) {
+				get.addFamily(((SingleColumnValueFilter) filter).getFamily());
+			}else{
+				get.addColumn(((SingleColumnValueFilter) filter).getFamily(), ((SingleColumnValueFilter) filter).getQualifier());
+			}
+			return;
+		}
+		if (filter instanceof FilterList) {
+			for (Filter f : ((FilterList)filter).getFilters()) {
+				workaroundHBASE2198(get, f,qualifiers);
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
new file mode 100755
index 0000000..9e059f2
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.*;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class IndexStreamReader  extends StreamReader {
+	protected final IndexDefinition indexDef;
+	protected final SearchCondition condition;
+	protected final List<byte[]> indexRowkeys;
+	protected LogReader<InternalLog> reader;
+	protected long lastTimestamp = 0;
+	protected long firstTimestamp = 0;
+	
+	protected static final Logger LOG = LoggerFactory.getLogger(IndexStreamReader.class);
+
+	public IndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) {
+		this.indexDef = indexDef;
+		this.condition = condition;
+		this.indexRowkeys = indexRowkeys;
+		this.reader = null;
+	}
+
+	@Override
+	public long getLastTimestamp() {
+		return lastTimestamp;
+	}
+
+	@Override
+	public long getFirstTimestamp() {
+		return this.firstTimestamp;
+	}
+
+	@Override
+	public void readAsStream() throws Exception {
+		if (reader == null) {
+			reader = createIndexReader();
+		}
+		final EntityDefinition entityDef = indexDef.getEntityDefinition();
+		try{
+			reader.open();
+			InternalLog log;
+			int count = 0;
+			while ((log = reader.read()) != null) {
+				TaggedLogAPIEntity entity = HBaseInternalLogHelper.buildEntity(log, entityDef);
+				entity.setSerializeAlias(condition.getOutputAlias());
+				entity.setSerializeVerbose(condition.isOutputVerbose());
+
+				if (lastTimestamp == 0 || lastTimestamp < entity.getTimestamp()) {
+					lastTimestamp = entity.getTimestamp();
+				}
+				if(firstTimestamp == 0 || firstTimestamp > entity.getTimestamp()){
+					firstTimestamp = entity.getTimestamp();
+				}
+				for(EntityCreationListener l : _listeners){
+					l.entityCreated(entity);
+				}
+				if(++count == condition.getPageSize()) {
+					break;
+				}
+			}
+		}catch(IOException ioe){
+			LOG.error("Fail reading log", ioe);
+			throw ioe;
+		}finally{
+			reader.close();
+		}		
+	}
+
+	protected abstract LogReader createIndexReader();
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
new file mode 100755
index 0000000..e6a5c96
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.common.ByteUtil;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.Filter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class NonClusteredIndexLogReader extends IndexLogReader {
+	private final IndexDefinition indexDef;
+	private final List<byte[]> indexRowkeys;
+	private final byte[][] qualifiers;
+	private final Filter filter;
+	private HTableInterface tbl;
+	private boolean isOpen = false;
+	private Result[] results;
+	private int index = -1;
+	private final List<Scan> scans;
+	private int currentScanIndex = 0;
+	private ResultScanner currentResultScanner;
+
+	// Max tag key/value. 
+	private static final byte[] MAX_TAG_VALUE_BYTES = {(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF};
+	private static final int BATCH_MULTIGET_SIZE = 1000;
+
+	public NonClusteredIndexLogReader(IndexDefinition indexDef, List<byte[]> indexRowkeys, byte[][] qualifiers, Filter filter) {
+		this.indexDef = indexDef;
+		this.indexRowkeys = indexRowkeys;
+		this.qualifiers = qualifiers;
+		this.filter = filter;
+		this.scans = buildScans();
+	}
+	
+
+	private List<Scan> buildScans() {
+		final ArrayList<Scan> result = new ArrayList<Scan>(indexRowkeys.size());
+		for (byte[] rowkey : indexRowkeys) {
+			Scan s = new Scan();
+			s.setStartRow(rowkey);
+			// In rowkey the tag key/value is sorted by the hash code of the key, so MAX_TAG_VALUE_BYTES is enough as the end key
+			final byte[] stopRowkey = ByteUtil.concat(rowkey, MAX_TAG_VALUE_BYTES);
+			s.setStopRow(stopRowkey);
+			// TODO the # of cached rows should be minimum of (pagesize and 100)
+			int cs = EagleConfigFactory.load().getHBaseClientScanCacheSize();
+			s.setCaching(cs);
+			// TODO not optimized for all applications
+			s.setCacheBlocks(true);
+			// scan specified columnfamily for all qualifiers
+			s.addFamily(indexDef.getEntityDefinition().getColumnFamily().getBytes());
+			result.add(s);
+		}
+		return result;
+	}
+
+	@Override
+	public void open() throws IOException {
+		if (isOpen)
+			return; // silently return
+		try {
+			tbl = EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable());
+		} catch (RuntimeException ex) {
+			throw new IOException(ex);
+		}
+		currentScanIndex = 0;
+		openNewScan();
+		fillResults();
+	}
+
+	private boolean openNewScan() throws IOException {
+		closeCurrentScanResult();
+		if (currentScanIndex >= scans.size()) {
+			return false;
+		}
+		final Scan scan = scans.get(currentScanIndex++);
+		currentResultScanner = tbl.getScanner(scan);
+		return true;
+	}
+
+	private void fillResults() throws IOException {
+		if (currentResultScanner == null) {
+			return;
+		}
+		index = 0;
+		int count = 0;
+		Result r = null;
+        final List<Get> gets = new ArrayList<Get>(BATCH_MULTIGET_SIZE);
+		final byte[] family = indexDef.getEntityDefinition().getColumnFamily().getBytes();
+		while (count < BATCH_MULTIGET_SIZE) {
+			r = currentResultScanner.next();
+			if (r == null) {
+				if (openNewScan()) {
+					continue;
+				} else {
+					break;
+				}
+			}
+			for (byte[] rowkey : r.getFamilyMap(family).keySet()) {
+				if (rowkey.length == 0) {	// invalid rowkey
+					continue;
+				}
+				final Get get = new Get(rowkey);
+                if (filter != null) {
+                	get.setFilter(filter);
+                }
+				if(qualifiers != null) {
+					for (int j = 0; j < qualifiers.length; ++j) {
+						// Return the specified qualifiers
+						get.addColumn(family, qualifiers[j]);
+					}
+				}else {
+					get.addFamily(family);
+				}
+        		workaroundHBASE2198(get, filter,qualifiers);
+				gets.add(get);
+				++count;
+			}
+		}
+		if (count == 0) {
+			results = null;
+			return;
+		}
+		results = tbl.get(gets);
+		if (results == null || results.length == 0) {
+			fillResults();
+		}
+	}
+
+
+	private void closeCurrentScanResult() {
+		if (currentResultScanner != null) {
+			currentResultScanner.close();
+			currentResultScanner = null;
+		}
+	}
+
+
+	@Override
+	public void close() throws IOException {
+		if(tbl != null){
+			new HTableFactory().releaseHTableInterface(tbl);
+		}
+		closeCurrentScanResult();
+	}
+
+	@Override
+	public InternalLog read() throws IOException {
+		if (tbl == null) {
+			throw new IllegalArgumentException("Haven't open before reading");
+		}
+		
+		Result r = null;
+		InternalLog t = null;
+		while ((r = getNextResult()) != null) {
+			if (r.getRow() == null) {
+				continue;
+			}
+			t = HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), r, qualifiers);
+			break;
+		}
+		return t;
+	}
+
+
+	private Result getNextResult() throws IOException {
+		if (results == null || results.length == 0 || index >= results.length) {
+			fillResults();
+		}
+		if (results == null || results.length == 0 || index >= results.length) {
+			return null;
+		}
+		return results[index++];
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
new file mode 100755
index 0000000..ec5631a
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.LogReader;
+import org.apache.eagle.log.entity.SearchCondition;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition.IndexType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class NonClusteredIndexStreamReader extends IndexStreamReader {
+	public NonClusteredIndexStreamReader(IndexDefinition indexDef, SearchCondition condition) {
+		super(indexDef, condition, new ArrayList<byte[]>());
+		final IndexType type = indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys);
+		if (!IndexType.NON_CLUSTER_INDEX.equals(type)) {
+			throw new IllegalArgumentException("This query can't go through index: " + condition.getQueryExpression());
+		}
+	}
+
+	public NonClusteredIndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) {
+		super(indexDef, condition, indexRowkeys);
+	}
+
+	@Override
+	protected LogReader createIndexReader() {
+		final EntityDefinition entityDef = indexDef.getEntityDefinition();
+		byte[][] outputQualifiers = null;
+		if(!condition.isOutputAll()) {
+			outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, condition.getOutputFields());
+		}
+		return new NonClusteredIndexLogReader(indexDef, indexRowkeys, outputQualifiers, condition.getFilter());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java
new file mode 100755
index 0000000..1c16dc8
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableFactory;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+
+public class RowKeyLogReader extends IndexLogReader {
+	private final EntityDefinition ed;
+	private final List<byte[]> rowkeys;
+    private final byte[][] qualifiers;
+    private HTableInterface tbl;
+	private boolean isOpen = false;
+	private Result[] entityResult;
+    private int getIndex = -1;
+
+    public RowKeyLogReader(EntityDefinition ed, byte[] rowkey) {
+        this.ed = ed;
+        this.rowkeys = new ArrayList<>();
+        this.rowkeys.add(rowkey);
+        this.qualifiers = null;
+    }
+
+	public RowKeyLogReader(EntityDefinition ed, byte[] rowkey,byte[][] qualifiers) {
+		this.ed = ed;
+		this.rowkeys = new ArrayList<>();
+        this.rowkeys.add(rowkey);
+        this.qualifiers = qualifiers;
+	}
+
+	public RowKeyLogReader(EntityDefinition ed, List<byte[]> rowkeys,byte[][] qualifiers) {
+		this.ed = ed;
+		this.rowkeys = rowkeys;
+        this.qualifiers = qualifiers;
+	}
+
+	@Override
+	public void open() throws IOException {
+		if (isOpen)
+			return; // silently return
+		try {
+			tbl = EagleConfigFactory.load().getHTable(ed.getTable());
+		} catch (RuntimeException ex) {
+			throw new IOException(ex);
+		}
+		final byte[] family = ed.getColumnFamily().getBytes();
+        List<Get> gets = new ArrayList<>(this.rowkeys.size());
+
+        for(byte[] rowkey:rowkeys) {
+            Get get = new Get(rowkey);
+            get.addFamily(family);
+
+            if(qualifiers != null) {
+                for(byte[] qualifier: qualifiers){
+                    get.addColumn(family,qualifier);
+                }
+            }
+
+            gets.add(get);
+        }
+
+        entityResult = tbl.get(gets);
+		isOpen = true;
+	}
+
+	@Override
+	public void close() throws IOException {
+		if(tbl != null){
+			new HTableFactory().releaseHTableInterface(tbl);
+		}
+	}
+
+	@Override
+	public InternalLog read() throws IOException {
+        if(entityResult == null || entityResult.length == 0 || this.getIndex >= entityResult.length - 1){
+            return null;
+        }
+        getIndex ++;
+		InternalLog t = HBaseInternalLogHelper.parse(ed, entityResult[getIndex], this.qualifiers);
+		return t;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java
new file mode 100755
index 0000000..8ff3448
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableFactory;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+
+public class UniqueIndexLogReader extends IndexLogReader {
+
+	private final IndexDefinition indexDef;
+	private final List<byte[]> indexRowkeys; 
+	private final byte[][] qualifiers;
+	private final Filter filter;
+	private HTableInterface tbl;
+	private boolean isOpen = false;
+	private Result[] entityResults;
+	private int index = -1;
+
+	public UniqueIndexLogReader(IndexDefinition indexDef, List<byte[]> indexRowkeys, byte[][] qualifiers, Filter filter) {
+		this.indexDef = indexDef;
+		this.indexRowkeys = indexRowkeys;
+		this.qualifiers = qualifiers;
+		this.filter = filter;
+	}
+
+	@Override
+	public void open() throws IOException {
+		if (isOpen)
+			return; // silently return
+		try {
+			tbl = EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable());
+		} catch (RuntimeException ex) {
+			throw new IOException(ex);
+		}
+		final byte[] family = indexDef.getEntityDefinition().getColumnFamily().getBytes();
+        final List<Get> indexGets = new ArrayList<>();
+        for (byte[] rowkey : indexRowkeys) {
+            Get get = new Get(rowkey);
+            // Return all index qualifiers
+            get.addFamily(family);
+            indexGets.add(get);
+        }
+        final Result[] indexResults = tbl.get(indexGets);
+        indexGets.clear();
+        for (Result indexResult : indexResults) {
+        	final NavigableMap<byte[], byte[]> map = indexResult.getFamilyMap(family);
+        	if (map == null) {
+        		continue;
+        	}
+        	for (byte[] entityRowkey : map.keySet()) {
+                Get get = new Get(entityRowkey);
+                if (filter != null) {
+                	get.setFilter(filter);
+                }
+				if(qualifiers == null) {
+					// filter all qualifiers if output qualifiers are null
+					get.addFamily(family);
+				}else {
+					for (int i = 0; i < qualifiers.length; ++i) {
+						// Return the specified qualifiers
+						get.addColumn(family, qualifiers[i]);
+					}
+				}
+				workaroundHBASE2198(get, filter,qualifiers);
+        		indexGets.add(get);
+        	}
+        }
+        entityResults = tbl.get(indexGets);
+		isOpen = true;
+	}
+
+	@Override
+	public void close() throws IOException {
+		if(tbl != null){
+			new HTableFactory().releaseHTableInterface(tbl);
+		}
+	}
+
+	@Override
+	public InternalLog read() throws IOException {
+		if (entityResults == null) {
+			throw new IllegalArgumentException("entityResults haven't been initialized before reading");
+		}
+		InternalLog t = null;
+		while (entityResults.length > ++index) {
+			Result r = entityResults[index];
+			if (r != null) {
+				if (r.getRow() == null) {
+					continue;
+				}
+				t = HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), r, qualifiers);
+				break;
+			}
+		}
+		return t;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java
new file mode 100755
index 0000000..0391d57
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.LogReader;
+import org.apache.eagle.log.entity.SearchCondition;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition.IndexType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniqueIndexStreamReader extends IndexStreamReader {
+	public UniqueIndexStreamReader(IndexDefinition indexDef, SearchCondition condition) {
+		super(indexDef, condition, new ArrayList<byte[]>());
+		final IndexType type = indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys);
+		if (!IndexType.UNIQUE_INDEX.equals(type)) {
+			throw new IllegalArgumentException("This query can't go through index: " + condition.getQueryExpression());
+		}
+	}
+
+	public UniqueIndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) {
+		super(indexDef, condition, indexRowkeys);
+	}
+
+	@Override
+	protected LogReader createIndexReader() {
+		final EntityDefinition entityDef = indexDef.getEntityDefinition();
+//		final
+		byte[][] outputQualifiers = null;
+		if(!condition.isOutputAll()) {
+			outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, condition.getOutputFields());
+		}
+		return new UniqueIndexLogReader(indexDef, indexRowkeys, outputQualifiers, condition.getFilter());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java
new file mode 100755
index 0000000..cf40e31
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+/**
+ * @since : 7/3/14,2014
+ */
+public class BooleanSerDeser implements EntitySerDeser<Boolean> {
+
+	public BooleanSerDeser(){}
+
+	@Override
+	public Boolean deserialize(byte[] bytes){
+		if(bytes != null && bytes.length > 0){
+			if(bytes[0] == 0){
+				return false;
+			}else if(bytes[0] == 1){
+				return true;
+			}
+		}
+		return null;
+	}
+
+	@Override
+	public byte[] serialize(Boolean obj){
+		if(obj != null){
+			if(obj){
+				return new byte[]{1};
+			}else{
+				return new byte[]{0};
+			}
+		}
+		return null;
+	}
+
+	@Override
+	public Class<Boolean> type() {
+		return Boolean.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java
new file mode 100644
index 0000000..b64e528
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.FIELD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Column {
+	String value() default "";
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java
new file mode 100644
index 0000000..6e3e9c6
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ColumnFamily {
+	String value() default "f";
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java
new file mode 100644
index 0000000..27b011c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * @since 7/22/15
+ */
+public class Double2DArraySerDeser implements EntitySerDeser<double[][]> {
+    private final int SIZE = 8;
+    @Override
+    public double[][] deserialize(byte[] bytes){
+//        if((bytes.length-4) % SIZE != 0)
+//            return null;
+        int offset = 0;
+        // get size of int array
+        int rowSize = ByteUtil.bytesToInt(bytes, offset);
+        offset += 4;
+
+        double[][] data = new double[rowSize][];
+        for(int i=0; i<rowSize; i++) {
+            int colSize = ByteUtil.bytesToInt(bytes, offset);
+            offset += 4;
+            double[] values = null;
+            if (colSize >= 0){
+                values = new double[colSize];
+                for (int j = 0; j < colSize; j++) {
+                    values[j] = ByteUtil.bytesToDouble(bytes, offset);
+                    offset += SIZE;
+                }
+            }
+            data[i] = values;
+        }
+
+        return data;
+    }
+
+    /**
+     *
+     * @param obj
+     * @return
+     */
+    @Override
+    public byte[] serialize(double[][] obj){
+        if(obj == null) return null;
+        ByteArrayOutputStream data = new ByteArrayOutputStream();
+        int size = obj.length;
+        byte[] sizeBytes = ByteUtil.intToBytes(size);
+        data.write(sizeBytes,0,sizeBytes.length);
+
+        try{
+            for(double[] o:obj){
+                if(o!=null){
+                    data.write(ByteUtil.intToBytes(o.length));
+                    for(double d:o){
+                        data.write(ByteUtil.doubleToBytes(d),0,SIZE);
+                    }
+                }else{
+                    data.write(ByteUtil.intToBytes(-1),0,4);
+                }
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        byte[] bytes = data.toByteArray();
+        try {
+            data.close();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        return bytes;
+    }
+
+    @Override
+    public Class<double[][]> type() {
+        return double[][].class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java
new file mode 100755
index 0000000..d87e31c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+public class DoubleArraySerDeser implements EntitySerDeser<double[]>{
+
+	public DoubleArraySerDeser(){}
+
+	private final int SIZE = 8;
+		@Override
+		public double[] deserialize(byte[] bytes){
+			if((bytes.length-4) % SIZE != 0)
+				return null;
+			int offset = 0;
+			// get size of int array
+			int size = ByteUtil.bytesToInt(bytes, offset);
+			offset += 4;
+			double[] values = new double[size];
+			for(int i=0; i<size; i++){
+				values[i] = ByteUtil.bytesToDouble(bytes, offset);
+				offset += SIZE;
+			}
+			return values;
+		}
+		
+		/**
+		 * 
+		 * @param obj
+		 * @return
+		 */
+		@Override
+		public byte[] serialize(double[] obj){
+			if(obj == null)
+				return null;
+			int size = obj.length;
+			byte[] array = new byte[4 + SIZE*size];
+			byte[] first = ByteUtil.intToBytes(size);
+			int offset = 0;
+			System.arraycopy(first, 0, array, offset, first.length);
+			offset += first.length;
+			for(int i=0; i<size; i++){
+				System.arraycopy(ByteUtil.doubleToBytes(obj[i]), 0, array, offset, SIZE);
+				offset += SIZE;
+			}
+			return array;
+		}
+
+	@Override
+	public Class<double[]> type() {
+		return double[].class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java
new file mode 100755
index 0000000..330a99d
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+public class DoubleSerDeser implements EntitySerDeser<Double>{
+
+	@Override
+	public Double deserialize(byte[] bytes){
+		if(bytes.length < 8)
+			return null;
+		return ByteUtil.bytesToDouble(bytes);
+	}
+	
+	@Override
+	public byte[] serialize(Double obj){
+		if(obj == null)
+			return null;
+		return ByteUtil.doubleToBytes(obj);
+	}
+
+	@Override
+	public Class<Double> type(){
+		return Double.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java
new file mode 100644
index 0000000..930743e
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.DateTimeUtil;
+
+public class EntityConstants {
+	
+	public static final String FIXED_WRITE_HUMANTIME = "1970-01-02 00:00:00";
+	public static final String FIXED_READ_START_HUMANTIME = "1970-01-01 00:00:00";
+	public static final String FIXED_READ_END_HUMANTIME = "1970-01-03 00:00:00";
+	
+	public static final long FIXED_WRITE_TIMESTAMP = 
+			DateTimeUtil.humanDateToSecondsWithoutException(FIXED_WRITE_HUMANTIME) * 1000;
+
+}