You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:23 UTC
[16/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46]
Rename package name as "org.apache.eagle"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
new file mode 100755
index 0000000..8209445
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.EntityQualifierUtils;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.Qualifier;
+import org.apache.eagle.common.ByteUtil;
+import org.apache.eagle.query.parser.*;
+import org.apache.hadoop.hbase.filter.*;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * the steps of building hbase filters
+ * 1. receive ORExpression from eagle-antlr
+ * 2. iterate all ANDExpression in ORExpression
+ * 2.1 put each ANDExpression to a new filter list with MUST_PASS_ONE option
+ * 2.2 iterate all AtomicExpression in ANDExpression
+ * 2.2.1 group AtomicExpression into 2 groups by looking up metadata, one is for tag filters, the other is for column filters
+ * 2.2.2 put the above 2 filters to a filter list with MUST_PASS_ALL option
+ */
+public class HBaseFilterBuilder {
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseFilterBuilder.class);
+
+ /**
+ * syntax is @<fieldname>
+ */
+// private static final String fnRegex = "^@(.*)$";
+ private static final Pattern _fnPattern = TokenConstant.ID_PATTERN;// Pattern.compile(fnRegex);
+ private static final Charset _defaultCharset = Charset.forName("ISO-8859-1");
+
+ private ORExpression _orExpr;
+ private EntityDefinition _ed;
+ private boolean _filterIfMissing;
+ private Charset _charset = _defaultCharset;
+
+ /**
+ * TODO: Verify performance impact
+ *
+ * @return
+ */
+ public Set<String> getFilterFields() {
+ return _filterFields;
+ }
+
+ /**
+ * Just add filter fields for expression filter
+ */
+ private Set<String> _filterFields;
+
+ public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr) {
+ this(ed, orExpr, false);
+ }
+
+ public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr, boolean filterIfMissing) {
+ this._ed = ed;
+ this._orExpr = orExpr;
+ this._filterIfMissing = filterIfMissing;
+ }
+
+ public void setCharset(String charsetName){
+ _charset = Charset.forName(charsetName);
+ }
+
+ public Charset getCharset(){
+ return _charset;
+ }
+
+ /**
+ * Because we don't have metadata for tag, we regard non-qualifer field as tag. So one field possibly is not a real tag when this function return true. This happens
+ * when a user input an wrong field name which is neither tag or qualifier
+ *
+ * @param field
+ */
+ private boolean isTag(String field){
+ return _ed.isTag(field);
+ }
+
+ /**
+ * check whether this field is one entity attribute or not
+ * @param fieldName
+ * @return
+ */
+ private String parseEntityAttribute(String fieldName){
+ Matcher m = _fnPattern.matcher(fieldName);
+ if(m.find()){
+ return m.group(1);
+ }
+ return null;
+ }
+
+ /**
+ * Return the partition values for each or expression. The size of the returned list should be equal to
+ * the size of FilterList that {@link #buildFilters()} returns.
+ *
+ * TODO: For now we don't support one query to query multiple partitions. In future if partition is defined,
+ * for the entity, internally We need to spawn multiple queries and send one query for each partition.
+ *
+ * @return Return the partition values for each or expression. Return null if the entity doesn't support
+ * partition
+ */
+ public List<String[]> getPartitionValues() {
+ final String[] partitions = _ed.getPartitions();
+ if (partitions == null || partitions.length == 0) {
+ return null;
+ }
+ final List<String[]> result = new ArrayList<String[]>();
+ final Map<String, String> partitionKeyValueMap = new HashMap<String, String>();
+ for(ANDExpression andExpr : _orExpr.getANDExprList()) {
+ partitionKeyValueMap.clear();
+ for(AtomicExpression ae : andExpr.getAtomicExprList()) {
+ // TODO temporarily ignore those fields which are not for attributes
+ if(ae.getKeyType() == TokenType.ID) {
+ final String fieldName = parseEntityAttribute(ae.getKey());
+ if (fieldName == null) {
+ LOG.warn(fieldName + " field does not have format @<FieldName>, ignored");
+ continue;
+ }
+ if (_ed.isPartitionTag(fieldName) && ComparisonOperator.EQUAL.equals(ae.getOp())) {
+ final String value = ae.getValue();
+ partitionKeyValueMap.put(fieldName, value);
+ }
+ }
+ }
+ final String[] values = new String[partitions.length];
+ result.add(values);
+ for (int i = 0; i < partitions.length; ++i) {
+ final String partition = partitions[i];
+ final String value = partitionKeyValueMap.get(partition);
+ values[i] = value;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @see org.apache.eagle.query.parser.TokenType
+ *
+ * @return
+ */
+ public FilterList buildFilters(){
+ // TODO: Optimize to select between row filter or column filter for better performance
+ // Use row key filter priority by default
+ boolean rowFilterPriority = true;
+
+ FilterList fltList = new FilterList(Operator.MUST_PASS_ONE);
+ for(ANDExpression andExpr : _orExpr.getANDExprList()){
+
+ FilterList list = new FilterList(Operator.MUST_PASS_ALL);
+ Map<String, List<String>> tagFilters = new HashMap<String, List<String>>();
+ List<QualifierFilterEntity> qualifierFilters = new ArrayList<QualifierFilterEntity>();
+// List<QualifierFilterEntry> tagLikeQualifierFilters = new ArrayList<QualifierFilterEntry>();
+
+ // TODO refactor not to use too much if/else
+ for(AtomicExpression ae : andExpr.getAtomicExprList()){
+ // TODO temporarily ignore those fields which are not for attributes
+
+ String fieldName = ae.getKey();
+ if(ae.getKeyType() == TokenType.ID){
+ fieldName = parseEntityAttribute(fieldName);
+ if(fieldName == null){
+ LOG.warn(fieldName + " field does not have format @<FieldName>, ignored");
+ continue;
+ }
+ }
+
+ String value = ae.getValue();
+ ComparisonOperator op = ae.getOp();
+ TokenType keyType = ae.getKeyType();
+ TokenType valueType = ae.getValueType();
+ QualifierFilterEntity entry = new QualifierFilterEntity(fieldName,value,op,keyType,valueType);
+
+ // TODO Exact match, need to add escape for those special characters here, including:
+ // "-", "[", "]", "/", "{", "}", "(", ")", "*", "+", "?", ".", "\\", "^", "$", "|"
+
+ if(keyType == TokenType.ID && isTag(fieldName)){
+ if ((ComparisonOperator.EQUAL.equals(op) || ComparisonOperator.IS.equals(op))
+ && !TokenType.NULL.equals(valueType))
+ {
+ // Use RowFilter for equal TAG
+ if(tagFilters.get(fieldName) == null) tagFilters.put(fieldName, new ArrayList<String>());
+ tagFilters.get(fieldName).add(value);
+ } else if (rowFilterPriority && ComparisonOperator.IN.equals(op))
+ {
+ // Use RowFilter here by default
+ if(tagFilters.get(fieldName) == null) tagFilters.put(fieldName, new ArrayList<String>());
+ tagFilters.get(fieldName).addAll(EntityQualifierUtils.parseList(value));
+ } else if (ComparisonOperator.LIKE.equals(op)
+ || ComparisonOperator.NOT_LIKE.equals(op)
+ || ComparisonOperator.CONTAINS.equals(op)
+ || ComparisonOperator.NOT_CONTAINS.equals(op)
+ || ComparisonOperator.IN.equals(op)
+ || ComparisonOperator.IS.equals(op)
+ || ComparisonOperator.IS_NOT.equals(op)
+ || ComparisonOperator.NOT_EQUAL.equals(op)
+ || ComparisonOperator.EQUAL.equals(op)
+ || ComparisonOperator.NOT_IN.equals(op))
+ {
+ qualifierFilters.add(entry);
+ } else
+ {
+ LOG.warn("Don't support operation: \"" + op + "\" on tag field: " + fieldName + " yet, going to ignore");
+ throw new IllegalArgumentException("Don't support operation: "+op+" on tag field: "+fieldName+", avaliable options: =, =!, =~, !=~, in, not in, contains, not contains");
+ }
+ }else{
+ qualifierFilters.add(entry);
+ }
+ }
+
+ // Build RowFilter for equal tags
+ list.addFilter(buildTagFilter(tagFilters));
+
+ // Build SingleColumnValueFilter
+ FilterList qualifierFilterList = buildQualifierFilter(qualifierFilters);
+ if(qualifierFilterList != null && qualifierFilterList.getFilters().size()>0){
+ list.addFilter(qualifierFilterList);
+ }else {
+ if(LOG.isDebugEnabled()) LOG.debug("Ignore empty qualifier filter from "+qualifierFilters.toString());
+ }
+ fltList.addFilter(list);
+ }
+ LOG.info("Query: " + _orExpr.toString() + " => Filter: " + fltList.toString());
+ return fltList;
+ }
+
+ /**
+ * _charset is used to decode the byte array, in hbase server, RegexStringComparator uses the same
+ * charset to decode the byte array stored in qualifier
+ * for tag filter regex, it's always ISO-8859-1 as it only comes from String's hashcode (Integer)
+ * Note: regex comparasion is to compare String
+ */
+ protected Filter buildTagFilter(Map<String, List<String>> tagFilters){
+ RegexStringComparator regexStringComparator = new RegexStringComparator(buildTagFilterRegex(tagFilters));
+ regexStringComparator.setCharset(_charset);
+ RowFilter filter = new RowFilter(CompareOp.EQUAL, regexStringComparator);
+ return filter;
+ }
+
+ /**
+ * all qualifiers' condition must be satisfied.
+ *
+ * <H1>Use RegexStringComparator for:</H1>
+ * IN
+ * LIKE
+ * NOT_LIKE
+ *
+ * <H1>Use SubstringComparator for:</H1>
+ * CONTAINS
+ *
+ * <H1>Use EntityQualifierHelper for:</H1>
+ * EQUALS
+ * NOT_EUQALS
+ * LESS
+ * LESS_OR_EQUAL
+ * GREATER
+ * GREATER_OR_EQUAL
+ *
+ * <H2>
+ * TODO: Compare performance of RegexStringComparator ,SubstringComparator ,EntityQualifierHelper
+ * </H2>
+ *
+ * @param qualifierFilters
+ * @return
+ */
+ protected FilterList buildQualifierFilter(List<QualifierFilterEntity> qualifierFilters){
+ FilterList list = new FilterList(Operator.MUST_PASS_ALL);
+ // iterate all the qualifiers
+ for(QualifierFilterEntity entry : qualifierFilters){
+ // if contains expression based filter
+ if(entry.getKeyType() == TokenType.EXP
+ || entry.getValueType() == TokenType.EXP
+ || entry.getKeyType() != TokenType.ID){
+ if(!EagleConfigFactory.load().isCoprocessorEnabled()) {
+ LOG.warn("Expression in filter may not support, because custom filter and coprocessor is disabled: " + entry.toString());
+ }
+ list.addFilter(buildExpressionBasedFilter(entry));
+ continue;
+ }
+
+ // else using SingleColumnValueFilter
+ String qualifierName = entry.getKey();
+ if(!isTag(entry.getKey())){
+ Qualifier qualifier = _ed.getDisplayNameMap().get(entry.getKey());
+ qualifierName = qualifier.getQualifierName();
+ }
+
+ // Comparator to be used for building HBase Filter
+ // WritableByteArrayComparable comparator;
+ ByteArrayComparable comparable;
+ if(ComparisonOperator.IN.equals(entry.getOp())
+ || ComparisonOperator.NOT_IN.equals(entry.getOp())){
+ Filter setFilter = buildListQualifierFilter(entry);
+ if(setFilter!=null){
+ list.addFilter(setFilter);
+ }
+ }else{
+ // If [=,!=,is,is not] NULL, use NullComparator else throw exception
+ if(TokenType.NULL.equals(entry.getValueType())){
+ if(ComparisonOperator.EQUAL.equals(entry.getOp())
+ ||ComparisonOperator.NOT_EQUAL.equals(entry.getOp())
+ ||ComparisonOperator.IS.equals(entry.getOp())
+ ||ComparisonOperator.IS_NOT.equals(entry.getOp()))
+ comparable = new NullComparator();
+ else
+ throw new IllegalArgumentException("Operation: "+entry.getOp()+" with NULL is not supported yet: "+entry.toString()+", avaliable options: [=, !=, is, is not] null|NULL");
+ }
+ // If [contains, not contains],use SubstringComparator
+ else if (ComparisonOperator.CONTAINS.equals(entry.getOp())
+ || ComparisonOperator.NOT_CONTAINS.equals(entry.getOp())) {
+ comparable = new SubstringComparator(entry.getValue());
+ }
+ // If [like, not like], use RegexStringComparator
+ else if (ComparisonOperator.LIKE.equals(entry.getOp())
+ || ComparisonOperator.NOT_LIKE.equals(entry.getOp())){
+ // Use RegexStringComparator for LIKE / NOT_LIKE
+ RegexStringComparator _comparator = new RegexStringComparator(buildQualifierRegex(entry.getValue()));
+ _comparator.setCharset(_charset);
+ comparable = _comparator;
+ } else{
+ Class type = EntityQualifierUtils.getType(_ed, entry.getKey());
+ // if type is null (is Tag or not found) or not defined for TypedByteArrayComparator
+ if(!EagleConfigFactory.load().isCoprocessorEnabled() || type == null || TypedByteArrayComparator.get(type) == null){
+ comparable = new BinaryComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), entry.getValue()));
+ }else {
+ comparable = new TypedByteArrayComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), entry.getValue()),type);
+ }
+ }
+
+ SingleColumnValueFilter filter =
+ new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparable);
+ filter.setFilterIfMissing(_filterIfMissing);
+ list.addFilter(filter);
+ }
+ }
+
+ return list;
+ }
+
+ private Filter buildExpressionBasedFilter(QualifierFilterEntity entry) {
+ BooleanExpressionComparator expressionComparator = new BooleanExpressionComparator(entry,_ed);
+ _filterFields = expressionComparator.getRequiredFields();
+ RowValueFilter filter = new RowValueFilter(expressionComparator);
+ return filter;
+ }
+
+ /**
+ * Currently use BinaryComparator only
+ * <h2>TODO: </h2>
+ * Possibility to tune performance by using: OR[BinaryComparator,...] instead of RegexStringComparator?
+ *
+ *<br/> <br/>
+ *
+ * ! Check op must be IN or NOTIN in caller
+ *
+ * @param entry
+ * @return
+ */
+ private Filter buildListQualifierFilter(QualifierFilterEntity entry){
+ List<String> valueSet = EntityQualifierUtils.parseList(entry.getValue());
+ Iterator<String> it = valueSet.iterator();
+ String fieldName = entry.getKey();
+ String qualifierName = fieldName;
+ if(!_ed.isTag(entry.getKey())){
+ qualifierName = _ed.getDisplayNameMap().get(entry.getKey()).getQualifierName();
+ }
+
+// TODO: Try to use RegExp just work if possible
+// Because single SingleColumnValueFilter is much faster than multi SingleColumnValueFilters in OR list.
+// Class qualifierType = EntityQualifierHelper.getType(_ed,fieldName);
+// if( qualifierType == null || qualifierType == String.class){
+// boolean first = true;
+// StringBuilder filterRegex = new StringBuilder();
+// filterRegex.append("^(");
+// while(it.hasNext()) {
+// String value = it.next();
+// if(value == null) {
+// logger.warn("ignore empty value in set qualifier filter: "+entry.toString());
+// continue;
+// }
+// if(!first) filterRegex.append("|");
+// filterRegex.append(value);
+// first = false;
+// }
+// filterRegex.append(")$");
+// RegexStringComparator regexStringComparator = new RegexStringComparator(filterRegex.toString());
+// return new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(),
+// convertToHBaseCompareOp(entry.getOp()), regexStringComparator);
+// }else{
+ FilterList setFilterList;
+ if(ComparisonOperator.IN.equals(entry.getOp())){
+ setFilterList = new FilterList(Operator.MUST_PASS_ONE);
+ }else if(ComparisonOperator.NOT_IN.equals(entry.getOp())) {
+ setFilterList = new FilterList(Operator.MUST_PASS_ALL);
+ }else{
+ throw new IllegalArgumentException(String.format("Don't support operation: %s on LIST type of value yet: %s, valid options: IN/NOT IN [LIST]",entry.getOp(),entry.toString()));
+ }
+
+ while(it.hasNext()) {
+ String value = it.next();
+ BinaryComparator comparator = new BinaryComparator(EntityQualifierUtils.toBytes(_ed, fieldName, value));
+ SingleColumnValueFilter filter =
+ new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparator);
+ filter.setFilterIfMissing(_filterIfMissing);
+ setFilterList.addFilter(filter);
+ }
+
+ return setFilterList;
+// }
+ }
+
+ /**
+ * Just used for LIKE and NOT_LIKE
+ *
+ * @param qualifierValue
+ * @return
+ */
+ protected String buildQualifierRegex(String qualifierValue){
+ StringBuilder sb = new StringBuilder();
+// sb.append("(?s)");
+ sb.append("^");
+ sb.append(qualifierValue);
+ sb.append("$");
+ return sb.toString();
+ }
+
+ /**
+ * Appends the given ID to the given buffer, followed by "\\E".
+ * [steal it from opentsdb, thanks opentsdb :) https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java]
+ */
+ private static void addId(final StringBuilder buf, final byte[] id) {
+ buf.append("\\Q");
+ boolean backslash = false;
+ for (final byte b : id) {
+ buf.append((char) (b & 0xFF));
+ if (b == 'E' && backslash) { // If we saw a `\' and now we have a `E'.
+ // So we just terminated the quoted section because we just added \E
+ // to `buf'. So let's put a litteral \E now and start quoting again.
+ buf.append("\\\\E\\Q");
+ } else {
+ backslash = b == '\\';
+ }
+ }
+ buf.append("\\E");
+ }
+
+ @SuppressWarnings("unused")
+ private static void addId(final StringBuilder buf, final String id) {
+ buf.append("\\Q");
+ int len = id.length()-1;
+ boolean backslash = false;
+ for (int i =0; i < len; i++) {
+ char c = id.charAt(i);
+ buf.append(c);
+ if (c == 'E' && backslash) { // If we saw a `\' and now we have a `E'.
+ // So we just terminated the quoted section because we just added \E
+ // to `buf'. So let's put a litteral \E now and start quoting again.
+ buf.append("\\\\E\\Q");
+ } else {
+ backslash = c == '\\';
+ }
+ }
+ buf.append("\\E");
+ }
+
+ /**
+ * one search tag may have multiple values which have OR relationship, and relationship between
+ * different search tags is AND
+ * the query is like "(TAG1=value11 OR TAG1=value12) AND TAG2=value2"
+ * @param tags
+ * @return
+ */
+ protected String buildTagFilterRegex(Map<String, List<String>> tags){
+ // TODO need consider that \E could be part of tag, refer to https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java
+ final SortedMap<Integer, List<Integer>> tagHash = new TreeMap<Integer, List<Integer>>();
+ final int numOfPartitionFields = (_ed.getPartitions() == null) ? 0 : _ed.getPartitions().length;
+ for(Map.Entry<String, List<String>> entry : tags.entrySet()){
+ String tagName = entry.getKey();
+ // Ignore tag if the tag is one of partition fields
+ if (_ed.isPartitionTag(tagName)) {
+ continue;
+ }
+ List<String> stringValues = entry.getValue();
+ List<Integer> hashValues = new ArrayList<Integer>(stringValues.size());
+ for(String value : stringValues){
+ hashValues.add(value.hashCode());
+ }
+ tagHash.put(tagName.hashCode(), hashValues);
+ }
+
+ // header = prefix(4 bytes) + partition_hashes(4*N bytes) + timestamp (8 bytes)
+ final int headerLength = 4 + numOfPartitionFields * 4 + 8;
+
+ // <tag1:4><value1:4> ... <tagn:4><valuen:4>
+ StringBuilder sb = new StringBuilder();
+ sb.append("(?s)");
+ sb.append("^(?:.{").append(headerLength).append("})");
+ sb.append("(?:.{").append(8).append("})*"); // for any number of tags
+ for (Map.Entry<Integer, List<Integer>> entry : tagHash.entrySet()) {
+ try {
+ addId(sb, ByteUtil.intToBytes(entry.getKey()));
+ List<Integer> hashValues = entry.getValue();
+ sb.append("(?:");
+ boolean first = true;
+ for(Integer value : hashValues){
+ if(!first){
+ sb.append('|');
+ }
+ addId(sb, ByteUtil.intToBytes(value));
+ first = false;
+ }
+ sb.append(")");
+ sb.append("(?:.{").append(8).append("})*"); // for any number of tags
+ } catch (Exception ex) {
+ LOG.error("constructing regex error", ex);
+ }
+ }
+ sb.append("$");
+ if(LOG.isDebugEnabled()) LOG.debug("Tag filter pattern is " + sb.toString());
+ return sb.toString();
+ }
+
+ /**
+ * Convert ComparisonOperator to native HBase CompareOp
+ *
+ * Support:
+ * =, =~,CONTAINS,<,<=,>,>=,!=,!=~
+ *
+ * @param comp
+ * @return
+ */
+ protected static CompareOp convertToHBaseCompareOp(ComparisonOperator comp) {
+ if(comp == ComparisonOperator.EQUAL || comp == ComparisonOperator.LIKE
+ || comp == ComparisonOperator.CONTAINS
+ || comp == ComparisonOperator.IN
+ || comp == ComparisonOperator.IS
+ ) {
+ return CompareOp.EQUAL;
+ }else if(comp == ComparisonOperator.LESS) {
+ return CompareOp.LESS;
+ } else if(comp == ComparisonOperator.LESS_OR_EQUAL){
+ return CompareOp.LESS_OR_EQUAL;
+ }else if(comp == ComparisonOperator.GREATER) {
+ return CompareOp.GREATER;
+ } else if(comp == ComparisonOperator.GREATER_OR_EQUAL){
+ return CompareOp.GREATER_OR_EQUAL;
+ } else if(comp == ComparisonOperator.NOT_EQUAL
+ || comp == ComparisonOperator.NOT_LIKE
+ || comp == ComparisonOperator.NOT_CONTAINS
+ || comp == ComparisonOperator.IS_NOT
+ || comp == ComparisonOperator.NOT_IN)
+ {
+ return CompareOp.NOT_EQUAL;
+ } else {
+ LOG.error("{} operation is not supported now\n", comp);
+ throw new IllegalArgumentException("Illegal operation: "+comp+ ", avaliable options: "+ Arrays.toString(ComparisonOperator.values()));
+ }
+ }
+
+ protected static CompareOp getHBaseCompareOp(String comp) {
+ return convertToHBaseCompareOp(ComparisonOperator.locateOperator(comp));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
new file mode 100755
index 0000000..6cdc77b
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import org.apache.eagle.query.parser.ComparisonOperator;
+import org.apache.eagle.query.parser.TokenType;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class QualifierFilterEntity implements Writable{
+ public String key;
+ public String value;
+ public ComparisonOperator op;
+ public TokenType valueType;
+ public TokenType keyType;
+
+ public QualifierFilterEntity(){}
+ public QualifierFilterEntity(String key, String value, ComparisonOperator comp, TokenType keyType, TokenType valueType) {
+ super();
+ this.key = key;
+ this.value = value;
+ this.op = comp;
+ this.keyType = keyType;
+ this.valueType = valueType;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public ComparisonOperator getOp() {
+ return op;
+ }
+
+ public void setOp(ComparisonOperator op) {
+ this.op = op;
+ }
+
+ public TokenType getValueType() {
+ return valueType;
+ }
+
+ public void setValueType(TokenType valueType) {
+ this.valueType = valueType;
+ }
+
+ public void setKeyType(TokenType keyType){
+ this.keyType = keyType;
+ }
+ public TokenType getKeyType(){
+ return this.keyType;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s %s %s",this.key,this.op,this.value);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(this.key);
+ out.writeUTF(this.getValue());
+ out.writeUTF(this.op.name());
+ out.writeUTF(this.keyType.name());
+ out.writeUTF(this.valueType.name());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.key = in.readUTF();
+ this.value = in.readUTF();
+ this.op = ComparisonOperator.valueOf(in.readUTF());
+ this.keyType = TokenType.valueOf(in.readUTF());
+ this.valueType = TokenType.valueOf(in.readUTF());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
new file mode 100755
index 0000000..a4b97ea
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.io.WritableComparable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * TODO: Critical performance problem!!!
+ * TODO: Refactor to specified multi-column filter so that avoid return all qualifier columns from region server to client side
+ *
+ * @since 2014/11/17
+ */
+public class RowValueFilter extends FilterBase {
+ private final static Logger LOG = LoggerFactory.getLogger(RowValueFilter.class);
+ private boolean filterOutRow = false;
+ private WritableComparable<List<KeyValue>> comparator;
+
+ // TODO: Use qualifiers to reduce network tranfer
+// private List<byte[]> qualifiers;
+ public RowValueFilter(){}
+
+ /**
+ * Filter out row if WritableComparable.compareTo return 0
+ * @param comparator <code>WritableComparable[List[KeyValue]]</code>
+ */
+ public RowValueFilter(WritableComparable<List<KeyValue>> comparator){
+ this.comparator = comparator;
+ }
+
+// public RowValueFilter(List<byte[]> qualifiers,WritableComparable<List<KeyValue>> comparator){
+// this.qualifiers = qualifiers;
+// this.comparator = comparator;
+// }
+
+ /**
+ * Old interface in hbase-0.94
+ *
+ * @param out
+ * @throws IOException
+ */
+ @Deprecated
+ public void write(DataOutput out) throws IOException {
+ this.comparator.write(out);
+ }
+
+ /**
+ * Old interface in hbase-0.94
+ *
+ * @param in
+ * @throws IOException
+ */
+// @Override
+ @Deprecated
+ public void readFields(DataInput in) throws IOException {
+ this.comparator = new BooleanExpressionComparator();
+ this.comparator.readFields(in);
+ }
+
+ /**
+ * TODO: Currently still use older serialization method from hbase-0.94, need to migrate into ProtoBuff based
+ *
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public byte[] toByteArray() throws IOException {
+ ByteArrayDataOutput byteArrayDataOutput = ByteStreams.newDataOutput();
+ this.comparator.write(byteArrayDataOutput);
+ return byteArrayDataOutput.toByteArray();
+ }
+
+ /**
+ * TODO: Currently still use older serialization method from hbase-0.94, need to migrate into ProtoBuff based
+ */
+ // Override static method
+ public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException {
+ ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(pbBytes);
+ RowValueFilter filter = new RowValueFilter();
+ try {
+ filter.readFields(byteArrayDataInput);
+ } catch (IOException e) {
+ LOG.error("Got error to deserialize RowValueFilter from PB bytes",e);
+ throw new DeserializationException(e);
+ }
+ return filter;
+ }
+
+ @Override
+ public boolean hasFilterRow(){
+ return true;
+ }
+
+ @Override
+ public void filterRow(List<KeyValue> row) {
+ filterOutRow = (this.comparator.compareTo(row) == 0);
+ }
+
+ @Override
+ public void reset() {
+ this.filterOutRow = false;
+ }
+
+ @Override
+ public boolean filterRow(){
+ return filterOutRow;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString()+" ( "+this.comparator.toString()+" )";
+ }
+
+// public List<byte[]> getQualifiers() {
+// return qualifiers;
+// }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
new file mode 100755
index 0000000..ecaf8cc
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.filter;
+
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.io.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * <h1>TypedByteArrayComparator</h1>
+ *
+ * Compare byte array: <code>byte[] value</code> with class type: <code>Class type</code>
+ *
+ * <br/>
+ * <br/>
+ * Built-in support:
+ *
+ * <pre>
+ * Double
+ * double
+ * Integer
+ * int
+ * Long
+ * long
+ * Short
+ * short
+ * Boolean
+ * boolean
+ * </pre>
+ *
+ * And can be extend by defining new {@link RawComparator} and register with {@link #define(Class type, RawComparator comparator)}
+ * <br/>
+ * <br/>
+ */
+public class TypedByteArrayComparator extends ByteArrayComparable {
+ private final static Logger LOG = LoggerFactory.getLogger(TypedByteArrayComparator.class);
+
+ private Class type;
+
+ // Not need to be writable
+ private RawComparator comparator;
+
+ /**
+ * Default constructor for writable
+ */
+ @SuppressWarnings("unused")
+ public TypedByteArrayComparator(){
+ super(null);
+ }
+
+ public TypedByteArrayComparator(byte[] value, Class type){
+ super(value);
+ this.type = type;
+ this.comparator = get(this.type);
+ if(this.comparator == null) throw new IllegalArgumentException("No comparator found for class: "+type);
+ }
+
+ /**
+ * @param in hbase-0.94 interface
+ * @throws IOException
+ */
+// @Override
+ public void readFields(DataInput in) throws IOException {
+// super.readFields(in);
+ try {
+ String _type = in.readUTF();
+ type = _primitiveTypeClassMap.get(_type);
+ if(type == null) {
+ type = Class.forName(_type);
+ }
+ comparator = get(type);
+ if(comparator == null) throw new IllegalArgumentException("No comparator found for class: "+type);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e.getMessage(),e);
+ }
+ }
+
+ /**
+ * @param out hbase-0.94 interface
+ * @throws IOException
+ */
+// @Override
+ public void write(DataOutput out) throws IOException {
+// super.write(out);
+ String typeName = type.getName();
+ out.writeUTF(typeName);
+ }
+
+ /**
+ * For hbase 0.98
+ *
+ * @return serialized byte array
+ */
+ @Override
+ public byte[] toByteArray() {
+ ByteArrayDataOutput byteArrayDataOutput = ByteStreams.newDataOutput();
+ try {
+ this.write(byteArrayDataOutput);
+ return byteArrayDataOutput.toByteArray();
+ } catch (IOException e) {
+ LOG.error("Failed to serialize due to: "+e.getMessage(),e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * For hbase 0.98
+ *
+ * @param bytes raw byte array
+ * @return Comparator instance
+ * @throws DeserializationException
+ */
+ public static TypedByteArrayComparator parseFrom(final byte [] bytes)
+ throws DeserializationException {
+ TypedByteArrayComparator comparator = new TypedByteArrayComparator();
+ ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(bytes);
+ try {
+ comparator.readFields(byteArrayDataInput);
+ } catch (IOException e) {
+ LOG.error("Got error to deserialize TypedByteArrayComparator from PB bytes",e);
+ throw new DeserializationException(e);
+ }
+ return comparator;
+ }
+
+ @Override
+ public int compareTo(byte[] value, int offset, int length) {
+ return this.comparator.compare(this.getValue(), 0, this.getValue().length, value, offset, length);
+ }
+
+ /**
+ * <ol>
+ * <li>Try registered comparator</li>
+ * <li>If not found, try all possible WritableComparator</li>
+ * </ol>
+ *
+ * If not found finally, throw new IllegalArgumentException("unable to get comparator for class: "+type);
+ *
+ * @param type value type class
+ * @return RawComparator
+ */
+ public static RawComparator get(Class type){
+ RawComparator comparator = null;
+ try {
+ comparator = _typedClassComparator.get(type);
+ }catch (ClassCastException ex){
+ // ignore
+ }
+ try {
+ if (comparator == null) comparator = WritableComparator.get(type);
+ }catch (ClassCastException ex){
+ // ignore
+ }
+ return comparator;
+ }
+
+ private final static Map<Class,RawComparator> _typedClassComparator = new HashMap<Class, RawComparator>();
+ public static void define(Class type, RawComparator comparator){
+ _typedClassComparator.put(type,comparator);
+ }
+
+ static{
+ define(Double.class, WritableComparator.get(DoubleWritable.class));
+ define(double.class, WritableComparator.get(DoubleWritable.class));
+ define(Integer.class, WritableComparator.get(IntWritable.class));
+ define(int.class, WritableComparator.get(IntWritable.class));
+ define(Long.class, WritableComparator.get(LongWritable.class));
+ define(long.class, WritableComparator.get(LongWritable.class));
+ define(Short.class, WritableComparator.get(ShortWritable.class));
+ define(short.class, WritableComparator.get(ShortWritable.class));
+ define(Boolean.class, WritableComparator.get(BooleanWritable.class));
+ define(boolean.class, WritableComparator.get(BooleanWritable.class));
+ }
+
+ /**
+ * Because {@link Class#forName } can't find class for primitive type
+ */
+ private final static Map<String,Class> _primitiveTypeClassMap = new HashMap<String, Class>();
+ static {
+ _primitiveTypeClassMap.put(int.class.getName(),int.class);
+ _primitiveTypeClassMap.put(double.class.getName(),double.class);
+ _primitiveTypeClassMap.put(long.class.getName(),long.class);
+ _primitiveTypeClassMap.put(short.class.getName(),short.class);
+ _primitiveTypeClassMap.put(boolean.class.getName(),boolean.class);
+ _primitiveTypeClassMap.put(char.class.getName(),char.class);
+ _primitiveTypeClassMap.put(byte.class.getName(),byte.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
new file mode 100755
index 0000000..418ab33
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.entity.LogReader;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+
+public abstract class IndexLogReader implements LogReader {
+
+ // TODO: Work around https://issues.apache.org/jira/browse/HBASE-2198. More graceful implementation should use SingleColumnValueExcludeFilter,
+ // but it's complicated in current implementation.
+ protected static void workaroundHBASE2198(Get get, Filter filter,byte[][] qualifiers) {
+ if (filter instanceof SingleColumnValueFilter) {
+ if(qualifiers == null) {
+ get.addFamily(((SingleColumnValueFilter) filter).getFamily());
+ }else{
+ get.addColumn(((SingleColumnValueFilter) filter).getFamily(), ((SingleColumnValueFilter) filter).getQualifier());
+ }
+ return;
+ }
+ if (filter instanceof FilterList) {
+ for (Filter f : ((FilterList)filter).getFilters()) {
+ workaroundHBASE2198(get, f,qualifiers);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
new file mode 100755
index 0000000..9e059f2
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.*;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class IndexStreamReader extends StreamReader {
+ protected final IndexDefinition indexDef;
+ protected final SearchCondition condition;
+ protected final List<byte[]> indexRowkeys;
+ protected LogReader<InternalLog> reader;
+ protected long lastTimestamp = 0;
+ protected long firstTimestamp = 0;
+
+ protected static final Logger LOG = LoggerFactory.getLogger(IndexStreamReader.class);
+
+ public IndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) {
+ this.indexDef = indexDef;
+ this.condition = condition;
+ this.indexRowkeys = indexRowkeys;
+ this.reader = null;
+ }
+
+ @Override
+ public long getLastTimestamp() {
+ return lastTimestamp;
+ }
+
+ @Override
+ public long getFirstTimestamp() {
+ return this.firstTimestamp;
+ }
+
+ @Override
+ public void readAsStream() throws Exception {
+ if (reader == null) {
+ reader = createIndexReader();
+ }
+ final EntityDefinition entityDef = indexDef.getEntityDefinition();
+ try{
+ reader.open();
+ InternalLog log;
+ int count = 0;
+ while ((log = reader.read()) != null) {
+ TaggedLogAPIEntity entity = HBaseInternalLogHelper.buildEntity(log, entityDef);
+ entity.setSerializeAlias(condition.getOutputAlias());
+ entity.setSerializeVerbose(condition.isOutputVerbose());
+
+ if (lastTimestamp == 0 || lastTimestamp < entity.getTimestamp()) {
+ lastTimestamp = entity.getTimestamp();
+ }
+ if(firstTimestamp == 0 || firstTimestamp > entity.getTimestamp()){
+ firstTimestamp = entity.getTimestamp();
+ }
+ for(EntityCreationListener l : _listeners){
+ l.entityCreated(entity);
+ }
+ if(++count == condition.getPageSize()) {
+ break;
+ }
+ }
+ }catch(IOException ioe){
+ LOG.error("Fail reading log", ioe);
+ throw ioe;
+ }finally{
+ reader.close();
+ }
+ }
+
+ protected abstract LogReader createIndexReader();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
new file mode 100755
index 0000000..e6a5c96
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.common.ByteUtil;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.Filter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class NonClusteredIndexLogReader extends IndexLogReader {
+ private final IndexDefinition indexDef;
+ private final List<byte[]> indexRowkeys;
+ private final byte[][] qualifiers;
+ private final Filter filter;
+ private HTableInterface tbl;
+ private boolean isOpen = false;
+ private Result[] results;
+ private int index = -1;
+ private final List<Scan> scans;
+ private int currentScanIndex = 0;
+ private ResultScanner currentResultScanner;
+
+ // Max tag key/value.
+ private static final byte[] MAX_TAG_VALUE_BYTES = {(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF};
+ private static final int BATCH_MULTIGET_SIZE = 1000;
+
+ public NonClusteredIndexLogReader(IndexDefinition indexDef, List<byte[]> indexRowkeys, byte[][] qualifiers, Filter filter) {
+ this.indexDef = indexDef;
+ this.indexRowkeys = indexRowkeys;
+ this.qualifiers = qualifiers;
+ this.filter = filter;
+ this.scans = buildScans();
+ }
+
+
+ private List<Scan> buildScans() {
+ final ArrayList<Scan> result = new ArrayList<Scan>(indexRowkeys.size());
+ for (byte[] rowkey : indexRowkeys) {
+ Scan s = new Scan();
+ s.setStartRow(rowkey);
+ // In rowkey the tag key/value is sorted by the hash code of the key, so MAX_TAG_VALUE_BYTES is enough as the end key
+ final byte[] stopRowkey = ByteUtil.concat(rowkey, MAX_TAG_VALUE_BYTES);
+ s.setStopRow(stopRowkey);
+ // TODO the # of cached rows should be minimum of (pagesize and 100)
+ int cs = EagleConfigFactory.load().getHBaseClientScanCacheSize();
+ s.setCaching(cs);
+ // TODO not optimized for all applications
+ s.setCacheBlocks(true);
+ // scan specified columnfamily for all qualifiers
+ s.addFamily(indexDef.getEntityDefinition().getColumnFamily().getBytes());
+ result.add(s);
+ }
+ return result;
+ }
+
+ @Override
+ public void open() throws IOException {
+ if (isOpen)
+ return; // silently return
+ try {
+ tbl = EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable());
+ } catch (RuntimeException ex) {
+ throw new IOException(ex);
+ }
+ currentScanIndex = 0;
+ openNewScan();
+ fillResults();
+ }
+
+ private boolean openNewScan() throws IOException {
+ closeCurrentScanResult();
+ if (currentScanIndex >= scans.size()) {
+ return false;
+ }
+ final Scan scan = scans.get(currentScanIndex++);
+ currentResultScanner = tbl.getScanner(scan);
+ return true;
+ }
+
+ private void fillResults() throws IOException {
+ if (currentResultScanner == null) {
+ return;
+ }
+ index = 0;
+ int count = 0;
+ Result r = null;
+ final List<Get> gets = new ArrayList<Get>(BATCH_MULTIGET_SIZE);
+ final byte[] family = indexDef.getEntityDefinition().getColumnFamily().getBytes();
+ while (count < BATCH_MULTIGET_SIZE) {
+ r = currentResultScanner.next();
+ if (r == null) {
+ if (openNewScan()) {
+ continue;
+ } else {
+ break;
+ }
+ }
+ for (byte[] rowkey : r.getFamilyMap(family).keySet()) {
+ if (rowkey.length == 0) { // invalid rowkey
+ continue;
+ }
+ final Get get = new Get(rowkey);
+ if (filter != null) {
+ get.setFilter(filter);
+ }
+ if(qualifiers != null) {
+ for (int j = 0; j < qualifiers.length; ++j) {
+ // Return the specified qualifiers
+ get.addColumn(family, qualifiers[j]);
+ }
+ }else {
+ get.addFamily(family);
+ }
+ workaroundHBASE2198(get, filter,qualifiers);
+ gets.add(get);
+ ++count;
+ }
+ }
+ if (count == 0) {
+ results = null;
+ return;
+ }
+ results = tbl.get(gets);
+ if (results == null || results.length == 0) {
+ fillResults();
+ }
+ }
+
+
+ private void closeCurrentScanResult() {
+ if (currentResultScanner != null) {
+ currentResultScanner.close();
+ currentResultScanner = null;
+ }
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ if(tbl != null){
+ new HTableFactory().releaseHTableInterface(tbl);
+ }
+ closeCurrentScanResult();
+ }
+
+ @Override
+ public InternalLog read() throws IOException {
+ if (tbl == null) {
+ throw new IllegalArgumentException("Haven't open before reading");
+ }
+
+ Result r = null;
+ InternalLog t = null;
+ while ((r = getNextResult()) != null) {
+ if (r.getRow() == null) {
+ continue;
+ }
+ t = HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), r, qualifiers);
+ break;
+ }
+ return t;
+ }
+
+
+ private Result getNextResult() throws IOException {
+ if (results == null || results.length == 0 || index >= results.length) {
+ fillResults();
+ }
+ if (results == null || results.length == 0 || index >= results.length) {
+ return null;
+ }
+ return results[index++];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
new file mode 100755
index 0000000..ec5631a
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.LogReader;
+import org.apache.eagle.log.entity.SearchCondition;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition.IndexType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class NonClusteredIndexStreamReader extends IndexStreamReader {
+ public NonClusteredIndexStreamReader(IndexDefinition indexDef, SearchCondition condition) {
+ super(indexDef, condition, new ArrayList<byte[]>());
+ final IndexType type = indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys);
+ if (!IndexType.NON_CLUSTER_INDEX.equals(type)) {
+ throw new IllegalArgumentException("This query can't go through index: " + condition.getQueryExpression());
+ }
+ }
+
+ public NonClusteredIndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) {
+ super(indexDef, condition, indexRowkeys);
+ }
+
+ @Override
+ protected LogReader createIndexReader() {
+ final EntityDefinition entityDef = indexDef.getEntityDefinition();
+ byte[][] outputQualifiers = null;
+ if(!condition.isOutputAll()) {
+ outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, condition.getOutputFields());
+ }
+ return new NonClusteredIndexLogReader(indexDef, indexRowkeys, outputQualifiers, condition.getFilter());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java
new file mode 100755
index 0000000..1c16dc8
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableFactory;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+
+public class RowKeyLogReader extends IndexLogReader {
+ private final EntityDefinition ed;
+ private final List<byte[]> rowkeys;
+ private final byte[][] qualifiers;
+ private HTableInterface tbl;
+ private boolean isOpen = false;
+ private Result[] entityResult;
+ private int getIndex = -1;
+
+ public RowKeyLogReader(EntityDefinition ed, byte[] rowkey) {
+ this.ed = ed;
+ this.rowkeys = new ArrayList<>();
+ this.rowkeys.add(rowkey);
+ this.qualifiers = null;
+ }
+
+ public RowKeyLogReader(EntityDefinition ed, byte[] rowkey,byte[][] qualifiers) {
+ this.ed = ed;
+ this.rowkeys = new ArrayList<>();
+ this.rowkeys.add(rowkey);
+ this.qualifiers = qualifiers;
+ }
+
+ public RowKeyLogReader(EntityDefinition ed, List<byte[]> rowkeys,byte[][] qualifiers) {
+ this.ed = ed;
+ this.rowkeys = rowkeys;
+ this.qualifiers = qualifiers;
+ }
+
+ @Override
+ public void open() throws IOException {
+ if (isOpen)
+ return; // silently return
+ try {
+ tbl = EagleConfigFactory.load().getHTable(ed.getTable());
+ } catch (RuntimeException ex) {
+ throw new IOException(ex);
+ }
+ final byte[] family = ed.getColumnFamily().getBytes();
+ List<Get> gets = new ArrayList<>(this.rowkeys.size());
+
+ for(byte[] rowkey:rowkeys) {
+ Get get = new Get(rowkey);
+ get.addFamily(family);
+
+ if(qualifiers != null) {
+ for(byte[] qualifier: qualifiers){
+ get.addColumn(family,qualifier);
+ }
+ }
+
+ gets.add(get);
+ }
+
+ entityResult = tbl.get(gets);
+ isOpen = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(tbl != null){
+ new HTableFactory().releaseHTableInterface(tbl);
+ }
+ }
+
+ @Override
+ public InternalLog read() throws IOException {
+ if(entityResult == null || entityResult.length == 0 || this.getIndex >= entityResult.length - 1){
+ return null;
+ }
+ getIndex ++;
+ InternalLog t = HBaseInternalLogHelper.parse(ed, entityResult[getIndex], this.qualifiers);
+ return t;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java
new file mode 100755
index 0000000..8ff3448
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableFactory;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+
+public class UniqueIndexLogReader extends IndexLogReader {
+
+ private final IndexDefinition indexDef;
+ private final List<byte[]> indexRowkeys;
+ private final byte[][] qualifiers;
+ private final Filter filter;
+ private HTableInterface tbl;
+ private boolean isOpen = false;
+ private Result[] entityResults;
+ private int index = -1;
+
+ public UniqueIndexLogReader(IndexDefinition indexDef, List<byte[]> indexRowkeys, byte[][] qualifiers, Filter filter) {
+ this.indexDef = indexDef;
+ this.indexRowkeys = indexRowkeys;
+ this.qualifiers = qualifiers;
+ this.filter = filter;
+ }
+
+ @Override
+ public void open() throws IOException {
+ if (isOpen)
+ return; // silently return
+ try {
+ tbl = EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable());
+ } catch (RuntimeException ex) {
+ throw new IOException(ex);
+ }
+ final byte[] family = indexDef.getEntityDefinition().getColumnFamily().getBytes();
+ final List<Get> indexGets = new ArrayList<>();
+ for (byte[] rowkey : indexRowkeys) {
+ Get get = new Get(rowkey);
+ // Return all index qualifiers
+ get.addFamily(family);
+ indexGets.add(get);
+ }
+ final Result[] indexResults = tbl.get(indexGets);
+ indexGets.clear();
+ for (Result indexResult : indexResults) {
+ final NavigableMap<byte[], byte[]> map = indexResult.getFamilyMap(family);
+ if (map == null) {
+ continue;
+ }
+ for (byte[] entityRowkey : map.keySet()) {
+ Get get = new Get(entityRowkey);
+ if (filter != null) {
+ get.setFilter(filter);
+ }
+ if(qualifiers == null) {
+ // filter all qualifiers if output qualifiers are null
+ get.addFamily(family);
+ }else {
+ for (int i = 0; i < qualifiers.length; ++i) {
+ // Return the specified qualifiers
+ get.addColumn(family, qualifiers[i]);
+ }
+ }
+ workaroundHBASE2198(get, filter,qualifiers);
+ indexGets.add(get);
+ }
+ }
+ entityResults = tbl.get(indexGets);
+ isOpen = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(tbl != null){
+ new HTableFactory().releaseHTableInterface(tbl);
+ }
+ }
+
+ @Override
+ public InternalLog read() throws IOException {
+ if (entityResults == null) {
+ throw new IllegalArgumentException("entityResults haven't been initialized before reading");
+ }
+ InternalLog t = null;
+ while (entityResults.length > ++index) {
+ Result r = entityResults[index];
+ if (r != null) {
+ if (r.getRow() == null) {
+ continue;
+ }
+ t = HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), r, qualifiers);
+ break;
+ }
+ }
+ return t;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java
new file mode 100755
index 0000000..0391d57
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.index;
+
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.LogReader;
+import org.apache.eagle.log.entity.SearchCondition;
+import org.apache.eagle.log.entity.meta.EntityDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition;
+import org.apache.eagle.log.entity.meta.IndexDefinition.IndexType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniqueIndexStreamReader extends IndexStreamReader {
+ public UniqueIndexStreamReader(IndexDefinition indexDef, SearchCondition condition) {
+ super(indexDef, condition, new ArrayList<byte[]>());
+ final IndexType type = indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys);
+ if (!IndexType.UNIQUE_INDEX.equals(type)) {
+ throw new IllegalArgumentException("This query can't go through index: " + condition.getQueryExpression());
+ }
+ }
+
+ public UniqueIndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) {
+ super(indexDef, condition, indexRowkeys);
+ }
+
+ @Override
+ protected LogReader createIndexReader() {
+ final EntityDefinition entityDef = indexDef.getEntityDefinition();
+// final
+ byte[][] outputQualifiers = null;
+ if(!condition.isOutputAll()) {
+ outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, condition.getOutputFields());
+ }
+ return new UniqueIndexLogReader(indexDef, indexRowkeys, outputQualifiers, condition.getFilter());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java
new file mode 100755
index 0000000..cf40e31
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+/**
+ * @since : 7/3/14,2014
+ */
+public class BooleanSerDeser implements EntitySerDeser<Boolean> {
+
+ public BooleanSerDeser(){}
+
+ @Override
+ public Boolean deserialize(byte[] bytes){
+ if(bytes != null && bytes.length > 0){
+ if(bytes[0] == 0){
+ return false;
+ }else if(bytes[0] == 1){
+ return true;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public byte[] serialize(Boolean obj){
+ if(obj != null){
+ if(obj){
+ return new byte[]{1};
+ }else{
+ return new byte[]{0};
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Class<Boolean> type() {
+ return Boolean.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java
new file mode 100644
index 0000000..b64e528
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.FIELD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Column {
+ String value() default "";
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java
new file mode 100644
index 0000000..6e3e9c6
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ColumnFamily {
+ String value() default "f";
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java
new file mode 100644
index 0000000..27b011c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * @since 7/22/15
+ */
+public class Double2DArraySerDeser implements EntitySerDeser<double[][]> {
+ private final int SIZE = 8;
+ @Override
+ public double[][] deserialize(byte[] bytes){
+// if((bytes.length-4) % SIZE != 0)
+// return null;
+ int offset = 0;
+ // get size of int array
+ int rowSize = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+
+ double[][] data = new double[rowSize][];
+ for(int i=0; i<rowSize; i++) {
+ int colSize = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+ double[] values = null;
+ if (colSize >= 0){
+ values = new double[colSize];
+ for (int j = 0; j < colSize; j++) {
+ values[j] = ByteUtil.bytesToDouble(bytes, offset);
+ offset += SIZE;
+ }
+ }
+ data[i] = values;
+ }
+
+ return data;
+ }
+
+ /**
+ *
+ * @param obj
+ * @return
+ */
+ @Override
+ public byte[] serialize(double[][] obj){
+ if(obj == null) return null;
+ ByteArrayOutputStream data = new ByteArrayOutputStream();
+ int size = obj.length;
+ byte[] sizeBytes = ByteUtil.intToBytes(size);
+ data.write(sizeBytes,0,sizeBytes.length);
+
+ try{
+ for(double[] o:obj){
+ if(o!=null){
+ data.write(ByteUtil.intToBytes(o.length));
+ for(double d:o){
+ data.write(ByteUtil.doubleToBytes(d),0,SIZE);
+ }
+ }else{
+ data.write(ByteUtil.intToBytes(-1),0,4);
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ byte[] bytes = data.toByteArray();
+ try {
+ data.close();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ return bytes;
+ }
+
+ @Override
+ public Class<double[][]> type() {
+ return double[][].class;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java
new file mode 100755
index 0000000..d87e31c
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+public class DoubleArraySerDeser implements EntitySerDeser<double[]>{
+
+ public DoubleArraySerDeser(){}
+
+ private final int SIZE = 8;
+ @Override
+ public double[] deserialize(byte[] bytes){
+ if((bytes.length-4) % SIZE != 0)
+ return null;
+ int offset = 0;
+ // get size of int array
+ int size = ByteUtil.bytesToInt(bytes, offset);
+ offset += 4;
+ double[] values = new double[size];
+ for(int i=0; i<size; i++){
+ values[i] = ByteUtil.bytesToDouble(bytes, offset);
+ offset += SIZE;
+ }
+ return values;
+ }
+
+ /**
+ *
+ * @param obj
+ * @return
+ */
+ @Override
+ public byte[] serialize(double[] obj){
+ if(obj == null)
+ return null;
+ int size = obj.length;
+ byte[] array = new byte[4 + SIZE*size];
+ byte[] first = ByteUtil.intToBytes(size);
+ int offset = 0;
+ System.arraycopy(first, 0, array, offset, first.length);
+ offset += first.length;
+ for(int i=0; i<size; i++){
+ System.arraycopy(ByteUtil.doubleToBytes(obj[i]), 0, array, offset, SIZE);
+ offset += SIZE;
+ }
+ return array;
+ }
+
+ @Override
+ public Class<double[]> type() {
+ return double[].class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java
new file mode 100755
index 0000000..330a99d
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.ByteUtil;
+
+public class DoubleSerDeser implements EntitySerDeser<Double>{
+
+ @Override
+ public Double deserialize(byte[] bytes){
+ if(bytes.length < 8)
+ return null;
+ return ByteUtil.bytesToDouble(bytes);
+ }
+
+ @Override
+ public byte[] serialize(Double obj){
+ if(obj == null)
+ return null;
+ return ByteUtil.doubleToBytes(obj);
+ }
+
+ @Override
+ public Class<Double> type(){
+ return Double.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java
new file mode 100644
index 0000000..930743e
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.DateTimeUtil;
+
+public class EntityConstants {
+
+ public static final String FIXED_WRITE_HUMANTIME = "1970-01-02 00:00:00";
+ public static final String FIXED_READ_START_HUMANTIME = "1970-01-01 00:00:00";
+ public static final String FIXED_READ_END_HUMANTIME = "1970-01-03 00:00:00";
+
+ public static final long FIXED_WRITE_TIMESTAMP =
+ DateTimeUtil.humanDateToSecondsWithoutException(FIXED_WRITE_HUMANTIME) * 1000;
+
+}